!261 xdevice用例上线

Merge pull request !261 from liyanlin02/master
This commit is contained in:
openharmony_ci 2024-01-19 01:08:38 +00:00 committed by Gitee
commit 067f594a19
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
66 changed files with 2722 additions and 10 deletions

View File

@ -110,7 +110,7 @@ class liteOsUpgrade_RK3568(BaseApp):
mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py")
archive_path = os.path.join(version_savepath)
if not self.check_devices_mode():
check_devices_cmd = "hdc_std list targets"
check_devices_cmd = "hdc list targets"
f = send_times(check_devices_cmd)
logger.info(f)
if not f or "Empty" in f:
@ -180,9 +180,11 @@ class liteOsUpgrade_RK3568(BaseApp):
if upgrade_test_type == "null":
return True
screenshot_path = os.path.join(local_image_path, "screenshot")
resource_path = os.path.join(screenshot_path, "resource")
resource_path = os.path.join(screenshot_path, 'xdevice_smoke')
logger.info(resource_path)
py_path = os.path.join(resource_path, "capturescreentest.py")
# py_path = os.path.join(screenshot_path, "main.py")
py_path = "main.py"
new_report_path = os.path.join(report_path, "result")
logger.info(new_report_path)
time_sleep = random.randint(1, 5)
@ -200,7 +202,12 @@ class liteOsUpgrade_RK3568(BaseApp):
return True
if not upgrade_test_type or upgrade_test_type == "smoke_test":
test_return = cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url)
# 进到工程目录
cur_path = os.getcwd()
os.chdir(resource_path)
test_return = cmd_test(resource_path, py_path, new_report_path, resource_path, sn, test_num, pr_url)
# 执行完回到原来的目录
os.chdir(cur_path)
if test_return == 1:
return True
if test_return == 98:
@ -252,7 +259,7 @@ class liteOsUpgrade_RK3568(BaseApp):
else:
#if test_num != "2/2":
# hdc_kill()
os.system("hdc_std -t %s shell reboot loader" % sn)
os.system("hdc -t %s shell reboot loader" % sn)
time.sleep(5)
check_times += 1
logger.error("Failed to enter the loader mode!")
@ -385,10 +392,10 @@ class liteOsUpgrade_RK3568(BaseApp):
@timeout(30)
def hdc_kill():
logger.info("kill the process")
os.system("hdc_std kill")
os.system("hdc kill")
time.sleep(2)
logger.info("start the process")
os.system("hdc_std -l5 start")
os.system("hdc -l5 start")
# time.sleep(10)
@ -413,9 +420,9 @@ def send_times(mycmd):
@timeout(180)
def start_cmd(sn):
try:
os.system("hdc_std -l5 start")
power_cmd = "hdc_std -t %s shell \"power-shell setmode 602\"" % sn
hilog_cmd = "hdc_std -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn
os.system("hdc -l5 start")
power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn
hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn
logger.info(power_cmd)
logger.info(hilog_cmd)
power_result = sendCmd(power_cmd)

View File

@ -0,0 +1,51 @@
[
{
"bundle&processName": "com.ohos.launcher",
"apl": "2"
},
{
"bundle&processName": "com.ohos.settings",
"apl": "2"
},
{
"bundle&processName": "com.ohos.systemui",
"apl": "2"
},
{
"bundle&processName": "com.ohos.screenlock",
"apl": "2"
},
{
"bundle&processName": "com.ohos.adminprovisioning",
"apl": "2"
},
{
"bundle&processName": "edm",
"apl": "3"
},
{
"bundle&processName": "com.ohos.settings.faceauth",
"apl": "2"
},
{
"bundle&processName": "cn.openharmony.inputmethodchoosedialog",
"apl":"3"
},
{
"bundle&processName":"media_service",
"apl":"3"
},
{
"bundle&processName":"com.ohos.amsdialog",
"apl":"3"
},
{
"bundle&processName":"com.ohos.useriam.authwidget",
"apl":"2"
},
{
"bundle&processName":"com.ohos.devicetest",
"apl":"3"
}
]

View File

@ -0,0 +1,22 @@
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [Main] --------APL Check Begin!--------
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [read_device] database start downloading!
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [read_device] D:\gitee\lyl\xdevice\developtools_integration_verification\cases\smoke\basic\screenshot32\xdevice_smoke\APL_compare_03\access_token.db download successful!
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [read_device] "select process_name,apl from native_token_info_table" query successful!
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [read_device] "select bundle_name,apl from hap_token_info_table" query successful!
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.useriam.authwidget apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = ohos.samples.distributedmusicplayer apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.note apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.launcher apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = ohos.telephony.resources apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.systemui apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.notificationdialog apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.smartperf apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.camera apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.settings apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.filepicker apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = ohos.samples.distributedcalc apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.devicetest apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.settings.faceauth apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_hap_apl] bundleName = com.ohos.adminprovisioning apl = 2
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [compare_native_apl] processName = edm apl = 3
2024-01-05 16:44:25 Fri 2024-01-05 16:44:25 Info [Main] --------APL Check End! --------

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
# PATH='D:\\repo_test\\APL_compare_02\\'
# PATH=os.getcwd()+'/'
PATH = os.path.dirname(os.path.realpath(__file__)) + os.sep
# read_excel.py
'''
SHEET_NAMEexcel中的表名中英文都可
COLSexcel中的列号从0开始
SVNSVN的安装目录下/bin目录SVN在环境变量中的位置
SVN_URLexcel文件对应的url
USERsvn的用户名
PWDsvn的密码
FILE_PATH本地下载文件的路径
'''
SHEET_NAME="Sheet1"
COLS=[1,3]
SVN='D:/TortoiseSVN/bin'
SVN_URL='https://PMAIL_2140981.china.huawei.com/svn/test测试/01 目录/01_1 目录/APL基线标准v1.0.xlsx'
USER='hhhhs'
PWD='123456'
FILE_PATH=PATH+SVN_URL.split('/')[-1]
# read_device.py
'''
SQL_SRC设备上的数据库路径
SQL_DES本地下载文件路径
DOWNLOAD_DB从设备下载的hdc命令
QUERY_HAP_APL查询HAP APL的sql语句查询多列可以依次添加字段添加字段的顺序为比较时的字段优先级
QUERY_NATIVE_APL查Native APL的sql语句
'''
SQL_SRC=" /data/service/el1/public/access_token/access_token.db"
SQL_DES=PATH
DOWNLOAD_DB="hdc -t {} file recv"
QUERY_HAP_APL="select bundle_name,apl from hap_token_info_table"
QUERY_NATIVE_APL="select process_name,apl from native_token_info_table"
'''
APL_LOG_FILE执行脚本的日志信息
APL_RECORD_PATHAPL对比记录的日志信息
IS_OVERWRITE是否覆盖之前的APL日志w表示覆盖a表示追加
'''
APL_LOG_FILE=PATH+'apl_compare.log'
APL_RECORD_PATH=PATH+'apl_record.txt'
IS_OVERWRITE='w'

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
import math
import enum
import time
import logging
import threading
from apl_config import *
log_tag = 'common'
apl_file_log = logging.FileHandler(filename=APL_LOG_FILE, mode='a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt='%Y-%m-%d %H:%M:%S %a')
apl_file_log.setFormatter(fmt)
# 定义日志
apl_logger = logging.Logger(name = 'apl_compare_log', level=logging.INFO)
apl_logger.addHandler(apl_file_log)
class ErrorType(enum.Enum):
not_in_apl_table = 1
apl_is_invalid = 2
class ApiLevel(enum.Enum):
normal = 1
system_basic = 2
system_core = 3
class LogLevel(enum.Enum):
Error = 1
Info = 2
class AplCompareException(Exception):
def __init__(self, msg):
self.msg = msg
class AplCompareThread(threading.Thread):
def __init__(self, func, args=()):
super(AplCompareThread, self).__init__()
self.func = func
self.args = args
self.result = None
def run(self):
self.result = self.func(*self.args)
def get_result(self):
threading.Thread.join(self)
try:
return self.result
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.args[0]))
return None
def apl_log(msg):
# 写日志
apl_logger.info(msg)
def apl_set_log_content(level, tag, msg):
log_content = timestamp() + ' {}'.format(level) + ' [{}]'.format(tag) + ' {}'.format(msg)
print(log_content)
apl_log(log_content)
return(log_content)
def set_error_record(name,error):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())+' %(name)-50s: %(error)-50s\n'%{'name':name,'error':error}
def set_map(results):
if results == None:
return None
res_map = {}
for result in results:
res_map[result[0]] = set_value(result[1:])
return res_map
def set_value(result):
value = []
for res in result:
if math.isnan(res):
res = 0
value.append(res)
return value
def timestamp():
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())

View File

@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from read_device import *
from read_excel import *
from apl_config import *
def whitelist_check(apl, value, fields_from_whitelist):
# True 包含在白名单内
check = value in fields_from_whitelist.keys()
is_pass = False
if check and str(apl) == fields_from_whitelist[value]:
is_pass = True
return is_pass
def compare_hap_apl(fields_from_device, fields_from_whitelist):
records = []
log_tag = 'compare_hap_apl'
hap_check = True
for value in fields_from_device:
apl = fields_from_device[value][0]
if apl > 1:
is_pass = whitelist_check(apl, value, fields_from_whitelist)
info = 'bundleName = {} apl = {}'.format(value, str(apl))
if is_pass == False:
hap_check = False
# info = value
# info = 'bundleName = {} apl = {}'.format(value, str(apl))
log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
records.append(log_content)
else:
apl_set_log_content(LogLevel(2).name, log_tag, info)
return records, hap_check
def compare_native_apl(fields_from_device, fields_from_whitelist):
records = []
log_tag = 'compare_native_apl'
native_check = True
for value in fields_from_device:
apl = fields_from_device[value][0]
if apl > 2:
info = 'processName = {} apl = {}'.format(value, str(apl))
is_pass = whitelist_check(apl, value, fields_from_whitelist)
if is_pass == False:
native_check = False
log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
records.append(log_content)
else:
apl_set_log_content(LogLevel(2).name, log_tag, info)
return records, native_check
def fields_compare_write_once(fields_from_device,fields_from_excel):
records=[]
for bundle_name in fields_from_device.keys():
if bundle_name not in fields_from_excel.keys():
record=(bundle_name,ErrorType(1).name)
records.append(record)
continue
fields=fields_from_device[bundle_name]
standard_fields=fields_from_excel[bundle_name]
if not isInvalid(fields,standard_fields):
record=(bundle_name,ErrorType(2).name)
records.append(record)
print('Compare successful!')
return records
def isInvalid(fields,standard_fields):
if len(fields) == 1:
return fields[0] <= standard_fields[0]
for field, standard_field in fields, standard_fields:
if field>standard_field:
return False
return True
def write_record(name,error):
try:
file = open(APL_RECORD_PATH,'a')
err_record = set_error_record(name, error)
file.write(err_record)
file.close()
except Exception as e:
log_content=apl_set_log_content(str(e))
apl_log(log_content)
def write_record_once(err_records,is_overwrite):
try:
file=open(APL_RECORD_PATH,is_overwrite)
for record in err_records:
err_record = set_error_record(record[0],record[1])
file.write(err_record)
file.close()
except Exception as e:
log_content=apl_set_log_content(str(e))
apl_log(log_content)
def excel_thread():
try:
# settings={
# ' svn': SVN,
# 'url': url_encode(SVN_URL),
# 'user': USER,
# 'pwd': PWD,
# 'dir': FILE_PATH,
# }
# excel_file = FILE_PATH #svn_checkout(settings)
log_tag = 'excel_thread'
# if excel_file == None:
# apl_set_log_content(LogLevel(2).name, log_tag, 'svn_checkoutc failed') #raise
# apl_from_excel = read_excel(excel_file, sheet = SHEET_NAME, cols = COLS)
# path = PATH + 'APL基线标准v1.0.json'
path = PATH + 'temp.json'
apl_from_json = read_json(path)
return apl_from_json
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, 'excel_thread catch error: {}'.format(e.args[0]))
return None
def sql_thread(sn, sn2):
log_tag = 'sql_thread'
try:
print(DOWNLOAD_DB.format(sn)+' ' + SQL_SRC + ' ' + SQL_DES)
print()
sql_file = download_from_device(DOWNLOAD_DB.format(sn), SQL_SRC, SQL_DES)
if sql_file == None:
raise
query_hap_apl_thread = AplCompareThread(query_hap_apl, (sql_file, QUERY_HAP_APL))
query_native_apl_thread = AplCompareThread(query_native_apl, (sql_file, QUERY_NATIVE_APL))
query_hap_apl_thread.start()
query_native_apl_thread.start()
query_hap_apl_thread.join()
query_native_apl_thread.join()
hap_apl_map = query_hap_apl_thread.get_result()
native_apl_map = query_native_apl_thread.get_result()
return hap_apl_map, native_apl_map
except:
apl_set_log_content(LogLevel(1).name, log_tag, 'download_from_device failed')
return None,None
def apl_check_main(sn):
try:
log_tag = 'Main'
apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check Begin!--------')
excel_thr = AplCompareThread(excel_thread)
sql_thr = AplCompareThread(sql_thread, (sn, sn))
excel_thr.start()
sql_thr.start()
excel_thr.join()
sql_thr.join()
apl_from_excel = excel_thr.get_result()
hap_apl_map, native_apl_map = sql_thr.get_result()
if apl_from_excel == None or hap_apl_map == None or native_apl_map == None:
raise
hap_results, hap_check = compare_hap_apl(hap_apl_map, apl_from_excel)
native_results, native_check = compare_native_apl(native_apl_map, apl_from_excel)
write_record_once(hap_results, IS_OVERWRITE)
write_record_once(native_results, 'a')
if native_check == False or hap_check == False:
apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = {}, native = {}] --------'.format(hap_check, native_check))
apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check End! --------')
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = False, native = False] --------')
apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.args[0]))
if __name__ == '__main__':
try:
sn = sys.argv[1]
except:
sn_list = []
result = os.popen('hdc list targets')
res = result.read()
for line in res.splitlines():
sn_list.append(line)
sn = sn_list[0]
apl_check_main(sn)

View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
from subprocess import run
import os
import sqlite3
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from common import *
from apl_config import *
log_tag = 'read_device'
#从设备中导出数据库
def download_from_device(cmd,sql_src,sql_des):
download_cmd=cmd+' '+sql_src+' '+sql_des
apl_set_log_content(LogLevel(2).name, log_tag, 'database start downloading!')
try:
result = os.popen(download_cmd)
stdout = result.read()
print(stdout)
if 'Fail' in stdout:
raise AplCompareException(stdout.replace('\n\n','').replace('[Fail]', ''))
#sql_file=sql_des+'\\'+sql_src.split('/').pop()
sql_file = sql_des+sql_src.split('/').pop()
apl_set_log_content(LogLevel(2).name, log_tag, '{} download successful!'.format(sql_file))
return sql_file
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
def sql_connect(db):
try:
if not os.path.exists(db):
raise AplCompareException('{} is not exists!'.format(db))
conn = sqlite3.connect(db)
return conn
except AplCompareException as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
#数据库语句查询
def query_records(db,sql):
log_content = ''
try:
conn = sql_connect(db)
if conn == None:
raise AplCompareException('{} cannot connect!'.format(db))
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
conn.close()
apl_set_log_content(LogLevel(2).name, log_tag, '"{}" query successful!'.format(sql))
return results
except sqlite3.OperationalError as e:
apl_set_log_content(LogLevel(2).name, log_tag, 'database {}'.format(e.args[0]))
return None
except AplCompareException as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
#查询hap_token_info_table中的bundle_name和apl
def query_hap_apl(db,sql):
results = query_records(db, sql)
return set_map(results)
#查询native_token_info_table中的process_name和apl
def query_native_apl(db,sql):
results = query_records(db, sql)
return set_map(results)

View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
# import subprocess
# import pandas as pd
# import urllib.parse
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from common import *
from apl_config import *
import json
log_tag = 'read_whitelist'
# # 全部文件夹检出本地已经安装svn
# def svn_checkout(settings):
# try:
# print(settings['url'])
# print(settings['dir'])
# os.chdir(settings['svn'])
# cmd = 'svn export --force %(url)s %(dir)s --username %(user)s --password %(pwd)s'%settings
# p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
# stdout,stderr = p.communicate()
# print(stderr)
# if stderr != b'':
# raise AplCompareException(str(stderr,'utf-8').replace('\r\n','\t'))
# apl_set_log_content(LogLevel(2).name, log_tag, '{} export successful!'.format(settings['dir']))
# return settings['dir']
# except Exception as e:
# apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.msg))
# return None
#
# #url编码
# def url_encode(url):
# partions=url.split("/",3)
# encode_url=partions[0]
# partions[-1]=urllib.parse.quote(partions[-1])
# for partion in partions[1:]:
# encode_url=encode_url+'/'+partion
# return encode_url
#
# def read_excel(file, sheet, cols):
# try:
# df = pd.read_excel(file, sheet_name = sheet, usecols = cols)
# data_list = df.values.tolist()
# apl_map = set_map(data_list)
# apl_set_log_content(LogLevel(2).name, log_tag, '{} read successful!'.format(file))
# return apl_map
# except (ValueError,FileNotFoundError) as e:
# apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.msg))
# return None
#
def read_json(path):
try:
with open(path, 'r') as f:
file = f.read()
data_list = json.loads(file)
res_dict = set_dict(data_list)
return res_dict
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
def set_dict(data_list: list()):
res_dict = {}
for res in data_list:
res_dict[res['bundle&processName']] = res['apl']
return res_dict

View File

@ -0,0 +1,121 @@
## 版本
python版本3.8.10
pip版本:22.1.2
python依赖
```
pip install pandas
pip install openpyxl
pip install subprocess
```
## 使用
`python compare.py`
## 目录
```
APL_compare
├── apl_config.py # 整个目录中的常量定义
├── read_device.py # 从设备中下载db并解析表和字段的函数
├── read_excel.py # 从excel中解析表和字段的函数
├── compare.py # 脚本运行入口
└── common.py # 公共需要用到的函数
```
## apl_config.py
常量定义
`PATH`:当前目录的地址
### read_excel.py
`SHEET_NAME`excel中的表名
`COLS`excel中的列名下标从0开始
`SVN`SVN的安装目录下的bin目录
`SVN_URL`excel文件在SVN上对应的url
`USER`svn的用户名
`PWD`svn的密码
`FILE_PATH`:本地下载文件的路径
`SQL_SRC`:设备上的数据库路径
`SQL_DES`:本地下载文件路径
`DOWNLOAD_DB`从设备下载的hdc命令
`QUERY_HAP_APL`查询HAP APL的sql语句查询多列可以依次添加字段添加字段的顺序为比较时的字段优先级
`QUERY_NATIVE_APL`查Native APL的sql语句
`APL_LOG_FILE`:执行脚本的日志路径
`APL_RECORD_PATH`APL对比记录的日志路径
`IS_OVERWRITE`是否覆盖之前的APL日志w表示覆盖a表示追加
## read_device.py
用于从设备上导出数据库,并解析表和字段
### 数据库导出
函数:`download_from_device(cmd,sql_src,sql_des)`
hdc命令`cmd`
设备中数据库路径:`sql_src`
本地数据库路径:`sql_des`
执行命令:`hdc file recv sql_src sql_des`
### 连接数据库
相关函数:`sql_connect(db)`
传入参数:`db`--db文件存放路径
返回结果:`conn`--数据库的连接
### sql语句查询
相关函数:`query_records(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`sql查询语句
返回结果:`results`--查询结果
### 查hap_token_info_table中的bundle_name和apl
sql语句`QUERY_HAP_APL="select bundle_name,apl from hap_token_info_table"`
相关函数:`query_hap_apl(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`sql查询语句
返回结果:`res_map`--查询结果转化为的字典mapkey是bundle_namevalue是apl
### 查询native_token_info_table中的process_name和apl
sql语句`QUERY_NATIVE_APL="select process_name,apl from native_token_info_table"`
相关函数:`query_native_apl(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`--sql查询语句
返回结果:`res_map`--查询结果转化为的字典mapkey是process_namevalue是apl
## read_excel.py
### 从svn上下载excel
相关函数:`syn_checkout(settings)`
传入参数:`settings`--包含svn上文件路径本地路径用户名密码
返回结果:`settings['dir']`--本地下载路径
### url编码
相关函数:`url_encode(url)`
传入参数:`url`
返回结果:`encode_url`
### 解析excel
相关函数:`read_excel(file,sheet,cols)`
传入参数:`file`--excel文件`sheet`--表名,`cols`--列名
返回结果:`apl_map`----查询结果转化为的字典mapkey是bundle/process_namevalue是apl
## common.py
### 脚本执行过程中的错误日志
相关函数:`log(msg)`
相关参数:`msg`--错误信息
### 设置脚本执行过程中的日志信息
相关函数:`apl_set_log_content(msg)`
相关参数:`msg`--日志信息,`is_error`--用于判断是执行失败、成功
返回结果:带时间戳的日志信息
### 设置apl记录的格式
相关函数set_error_record(name,error)
相关参数:`name`--bundle name或者native name`error`--错误原因
返回结果:带时间戳的记录
### 将查询结果转化成map的结构
相关函数:`set_map(results)`
传入参数:`results`--查询结果的列表
返回结果:`res_map`
### 转换查询结果map的value格式
相关函数:`set_value(result)`
传入参数:`result`--查询到的每一行结果
返回结果:`value`--包含查询到的字段的列表
### 时间戳
相关函数:`timestamp()`
返回结果:时间戳
### 错误类型
`ErrorType`:枚举类
### 自定义异常
`AplCompareException`
### 自定义线程
`AplCompareThread`
### 日志格式设置
`logging.basicConfig`

View File

@ -0,0 +1,90 @@
[
{
"bundle&processName": "com.ohos.launcher",
"apl": "2"
},
{
"bundle&processName": "com.ohos.settings",
"apl": "2"
},
{
"bundle&processName": "com.ohos.systemui",
"apl": "2"
},
{
"bundle&processName": "com.ohos.screenlock",
"apl": "2"
},
{
"bundle&processName": "com.ohos.adminprovisioning",
"apl": "2"
},
{
"bundle&processName": "edm",
"apl": "3"
},
{
"bundle&processName": "com.ohos.settings.faceauth",
"apl": "2"
},
{
"bundle&processName": "cn.openharmony.inputmethodchoosedialog",
"apl":"3"
},
{
"bundle&processName":"media_service",
"apl":"3"
},
{
"bundle&processName":"com.ohos.amsdialog",
"apl":"3"
},
{
"bundle&processName":"com.ohos.useriam.authwidget",
"apl":"2"
},
{
"bundle&processName":"com.ohos.powerdialog",
"apl":"2"
},
{
"bundle&processName":"com.ohos.filepicker",
"apl":"2"
},
{
"bundle&processName":"com.ohos.camera",
"apl":"3"
},
{
"bundle&processName":"com.ohos.smartperf",
"apl":"2"
},
{
"bundle&processName":"com.ohos.devicemanagerui",
"apl":"2"
},
{
"bundle&processName":"ohos.telephony.resources",
"apl":"3"
},
{
"bundle&processName":"com.ohos.notificationdialog",
"apl":"2"
},
{
"bundle&processName":"ohos.samples.distributedcalc",
"apl":"3"
},
{
"bundle&processName":"ohos.samples.distributedmusicplayer",
"apl":"3"
},
{
"bundle&processName":"com.ohos.note",
"apl":"3"
},
{
"bundle&processName":"com.ohos.devicetest",
"apl":"2"
}
]

View File

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from read_acl_whitelist import *
from resolve_token_info import *
from acls_check.utils import *
log_tag = 'acl_check'
def whitelist_check(whitelist, acls):
try:
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'-------------------------- Trustlist Verification begin --------------------------')
check_pass = True
for k, v in acls.items():
if k in whitelist.keys():
temp = whitelist[k]
for acl in v:
if acl not in temp:
check_pass = False
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'precessName = {} the acl = {} trustlist is not configured.'.format(k, acl))
else:
check_pass = False
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check', 'precessName = {} the acls = {} trustlist is not configured.'.format(k, v))
if check_pass == False:
raise AclCheckException(
'-------------------------- Trustlist Verification failed --------------------------')
else:
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'-------------------------- Trustlist Verification successful --------------------------')
except Exception as e:
set_log_content(LogLevel(1).name, log_tag + '->whitelist_check', e.args)
raise
def acl_check_main(sn):
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- ACL check begin --------------------------')
try:
hdc_command(GENERATING_TOKEN_INFO_COMMAND.format(sn, TOKEN_INFO_URL.format(sn)))
hdc_command(DOWNLOAD_TOKEN_INFO_COMMAND.format(sn, TOKEN_INFO_URL.format(sn), DOWNLOAD_TOKEN_INFO_URL.format(sn)))
hdc_command(CLEAR_TOKEN_INFO_FILE.format(sn, TOKEN_INFO_URL.format(sn)))
file = read_txt(DOWNLOAD_TOKEN_INFO_URL.format(sn))
# clear_token_info_txt(DOWNLOAD_TOKEN_INFO_URL.format(sn))
acls_dict = check_and_get(file)
acl_whitelist = read_json(PATH + 'acl_whitelist.json')
whitelist = get_acl_dict(acl_whitelist)
whitelist_check(whitelist, acls_dict)
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.args)
set_log_content(LogLevel(1).name, log_tag,
'-------------------------- ACL check failed --------------------------')
finally:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- ACL check end --------------------------')
if __name__ == '__main__':
sn = ''
acl_check_main(sn)

View File

@ -0,0 +1,177 @@
[
{
"processName": "hiview",
"acls": [
"ohos.permission.DUMP"
]
},
{
"processName": "privacy_service",
"acls": [
"ohos.permission.MANAGE_DISPOSED_APP_STATUS"
]
},
{
"processName": "inputmethod_service",
"acls": [
"ohos.permission.INPUT_MONITORING"
]
},
{
"processName": "memmgrservice",
"acls": [
"ohos.permission.INTERACT_ACROSS_LOCAL_ACCOUNTS_EXTENSION"
]
},
{
"processName": "locationhub",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "useriam",
"acls": [
"ohos.permission.ACCESS_AUTH_RESPOOL",
"ohos.permission.INTERACT_ACROSS_LOCAL_ACCOUNTS_EXTENSION"
]
},
{
"processName": "pinauth",
"acls": [
"ohos.permission.ACCESS_AUTH_RESPOOL"
]
},
{
"processName": "foundation",
"acls": [
"ohos.permission.PUBLISH_SYSTEM_COMMON_EVENT",
"ohos.permission.PERMISSION_START_ABILITIES_FROM_BACKGROUND",
"ohos.permission.GRANT_SENSITIVE_PERMISSIONS",
"ohos.permission.REVOKE_SENSITIVE_PERMISSIONS",
"ohos.permission.MANAGE_HAP_TOKENID",
"ohos.permission.START_INVISIBLE_ABILITY",
"ohos.permission.INPUT_MONITORING",
"ohos.permission.INSTALL_SANDBOX_BUNDLE"
]
},
{
"processName": "dscreen",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
},
{
"processName": "sensors",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "camera_service",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "audio_server",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "msdp_sa",
"acls": [
"ohos.permission.INPUT_MONITORING",
"ohos.permission.ACCESS_DISTRIBUTED_HARDWARE",
"ohos.permission.INTERCEPT_INPUT_EVENT"
]
},
{
"processName": "dslm_service",
"acls": [
"ohos.permission.ACCESS_IDS"
]
},
{
"processName": "accountmgr",
"acls": [
"ohos.permission.ENFORCE_USER_IDM",
"ohos.permission.STORAGE_MANAGER_CRYPT"
]
},
{
"processName": "hdcd",
"acls": [
"ohos.permission.GET_BUNDLE_INFO_PRIVILEGED",
"ohos.permission.INSTALL_BUNDLE",
"ohos.permission.LISTEN_BUNDLE_CHANGE",
"ohos.permission.CHANGE_ABILITY_ENABLED_STATE",
"ohos.permission.REMOVE_CACHE_FILES",
"ohos.permission.START_ABILITIES_FROM_BACKGROUND",
"ohos.permission.PERMISSION_USED_STATS",
"ohos.permission.DUMP",
"ohos.permission.NOTIFICATION_CONTROLLER",
"ohos.permission.PUBLISH_SYSTEM_COMMON_EVENT",
"ohos.permission.CLEAN_APPLICATION_DATA"
]
},
{
"processName": "softbus_server",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "backup_sa",
"acls": [
"ohos.permission.INSTALL_BUNDLE"
]
},
{
"processName": "media_service",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
},
{
"processName": "security_component_service",
"acls": [
"ohos.permission.GRANT_SENSITIVE_PERMISSIONS",
"ohos.permission.REVOKE_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "distributedsched",
"acls": [
"ohos.permission.INPUT_MONITORING",
"ohos.permission.MANAGE_MISSIONS"
]
},
{
"processName": "accessibility",
"acls": [
"ohos.permission.INTERCEPT_INPUT_EVENT"
]
},
{
"processName": "dlp_permission_service",
"acls": [
"ohos.permission.INSTALL_SANDBOX_BUNDLE",
"ohos.permission.UNINSTALL_SANDBOX_BUNDLE"
]
},
{
"processName": "quick_fix",
"acls": [
"ohos.permission.INSTALL_QUICK_FIX_BUNDLE",
"ohos.permission.UNINSTALL_QUICK_FIX_BUNDLE"
]
},
{
"processName": "sharing_service",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
}
]

View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import platform
import time
# 系统分隔符
SYS_SEQ = os.sep
# 系统平台
SYS_PLATFORM = platform.system()
PATH = os.path.dirname(os.path.realpath(__file__)) + SYS_SEQ
# 脚本的执行日志
LOG_FILE = PATH + SYS_SEQ + "native_sa.log"
# 设备上生成的token info 文件名
TOKEN_INFO_NAME = 'token_info_'+ str(time.time_ns()) +'_{}.txt'
# 设备上生成文件存放位置
TOKEN_INFO_URL = '/data/{}'.format(TOKEN_INFO_NAME)
# 设备上文件生成命令
GENERATING_TOKEN_INFO_COMMAND = 'hdc -t {} shell atm dump -t > {}'
# 下载token info 文件存放路径
DOWNLOAD_TOKEN_INFO_URL = PATH + TOKEN_INFO_NAME
# 文件下载命令
DOWNLOAD_TOKEN_INFO_COMMAND = 'hdc -t {} file recv {} {}'
# 删除设备上的文件命令
CLEAR_TOKEN_INFO_FILE = 'hdc -t {} shell rm -rf {}'

View File

@ -0,0 +1,16 @@
2024-01-05 16:44:26 Fri 2024-01-05 16:44:26 Info [acl_check] -------------------------- ACL check begin --------------------------
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [utils] ['hdc', '-t', '150100424a5444345209d941be3fb900', 'shell', 'atm', 'dump', '-t', '>', '/data/token_info_1704444266821975400_150100424a5444345209d941be3fb900.txt'] operation fuccessful!
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [utils] ['hdc', '-t', '150100424a5444345209d941be3fb900', 'file', 'recv', '/data/token_info_1704444266821975400_150100424a5444345209d941be3fb900.txt', 'D:\\gitee\\lyl\\xdevice\\developtools_integration_verification\\cases\\smoke\\basic\\screenshot32\\xdevice_smoke\\acls_check\\token_info_1704444266821975400_150100424a5444345209d941be3fb900.txt'] operation fuccessful!
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [utils] ['hdc', '-t', '150100424a5444345209d941be3fb900', 'shell', 'rm', '-rf', '/data/token_info_1704444266821975400_150100424a5444345209d941be3fb900.txt'] operation fuccessful!
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [resolve_token_info] read D:\gitee\lyl\xdevice\developtools_integration_verification\cases\smoke\basic\screenshot32\xdevice_smoke\acls_check\token_info_1704444266821975400_150100424a5444345209d941be3fb900.txt
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [resolve_token_info] -------------------------- invalidPermList check begin --------------------------
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [resolve_token_info] -------------------------- The invalidPermList check successful --------------------------
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [read_acl_whitelist] read D:\gitee\lyl\xdevice\developtools_integration_verification\cases\smoke\basic\screenshot32\xdevice_smoke\acls_check\acl_whitelist.json
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [acl_check->whitelist_check] -------------------------- Trustlist Verification begin --------------------------
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [acl_check->whitelist_check] precessName = quick_fix the acls = ['ohos.permission.INSTALL_QUICK_FIX_BUNDLE', 'ohos.permission.UNINSTALL_QUICK_FIX_BUNDLE'] trustlist is not configured.
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [acl_check->whitelist_check] precessName = foundation the acl = ohos.permission.INSTALL_SANDBOX_BUNDLE trustlist is not configured.
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [acl_check->whitelist_check] precessName = dlp_permission_service the acls = ['ohos.permission.INSTALL_SANDBOX_BUNDLE', 'ohos.permission.UNINSTALL_SANDBOX_BUNDLE'] trustlist is not configured.
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Error [acl_check->whitelist_check] ('-------------------------- Trustlist Verification failed --------------------------',)
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Error [acl_check] ('-------------------------- Trustlist Verification failed --------------------------',)
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Error [acl_check] -------------------------- ACL check failed --------------------------
2024-01-05 16:44:27 Fri 2024-01-05 16:44:27 Info [acl_check] -------------------------- ACL check end --------------------------

View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
import json
from acls_check.utils import *
log_tag = 'read_acl_whitelist'
def read_json(path):
set_log_content(LogLevel(2).name, log_tag, 'read {}'.format(path))
if not os.path.exists(path):
set_log_content(LogLevel(2).name, log_tag, '{} file not exits'.format(path))
raise AclCheckException('{} file not exits'.format(path))
try:
with open(path, 'r') as f:
file = f.read()
return file
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException('{} failed to read the file.'.format(path))
def get_acl_dict(file):
try:
acls_dict = {}
f = json.loads(file)
for it in f:
key = it.get('processName')
values = it.get('acls')
acls_dict[key] = values
return acls_dict
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
raise

View File

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from acls_check.utils import *
log_tag = 'resolve_token_info'
def check_and_get(file: list):
nativeAcls = {}
try:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- invalidPermList check begin --------------------------')
check_pass = True
processName = 'xxxxxxxx'
for it in file:
if it.find('processName') != -1:
processName = it.replace(',', '').split(':')[1].split('"')[1]
elif it.find('invalidPermList') != -1:
check_pass = False
msg = 'invalidPermList information is detected in processName = {}'.format(processName)
set_log_content(LogLevel(2).name, log_tag, msg)
elif check_pass and it.find('nativeAcls') != -1:
bb = it.split(':')
if bb[1].split('"')[1].__len__() == 0:
continue
permissionNameList = bb[1].split('"')[1].split(',')
nativeAcls[processName] = permissionNameList
if check_pass == False:
raise AclCheckException('-------------------------- The invalidPermList check failed --------------------------')
else:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- The invalidPermList check successful --------------------------')
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise
return nativeAcls
def clear_token_info_txt(path):
try:
os.remove(path)
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
def read_txt(path):
set_log_content(LogLevel(2).name, log_tag, 'read {}'.format(path))
if not os.path.exists(path):
set_log_content(LogLevel(2).name, log_tag, '{} file not exits'.format(path))
raise AclCheckException('{} file not exits!'.format(path))
try:
with open(path, 'r') as f:
file = f.readlines()
return file
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException('{} failed to read the file.'.format(path))

View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
import os
import sys
from subprocess import Popen, PIPE, STDOUT
# sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from config import *
log_tag = 'utils'
class AclCheckException(Exception):
def __init__(self, msg):
self.msg = msg
def timestamp():
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
class LogLevel(enum.Enum):
Error = 1
Info = 2
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S %a')
def log(msg):
logging.error(msg)
def set_log_content(level, tag, msg):
log_content = timestamp() + ' {}'.format(level) + ' [{}]'.format(tag) + ' {}'.format(msg)
print(log_content)
log(log_content)
return (log_content)
def shell_command(command_list: list):
try:
print(command_list)
process = Popen(command_list, stdout=PIPE, stderr=STDOUT)
exitcode = process.wait()
set_log_content(LogLevel(2).name, log_tag, '{} operation fuccessful!'.format(command_list))
return process, exitcode
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException(e.msg)
def hdc_command(command):
print(command)
command_list = command.split(' ')
_, exitcode = shell_command(command_list)
return exitcode

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<device type="usb-hdc">
<ip></ip>
<port></port>
<sn></sn>
</device>
<device type="com" label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device type="com" label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com" label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir>testcases</dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
<devicelog>ON</devicelog>
<loglevel>DEBUG</loglevel>
</user_config>

View File

@ -0,0 +1,143 @@
import argparse
import os.path
import shutil
import sys
from xdevice.__main__ import main_process
from xdevice._core.report.result_reporter import ResultReporter
from xml.dom.minidom import parse
import pandas as pd
from datetime import datetime
BASE_DIR = os.path.dirname(__file__)
TEST_CASE_MUST = [
'Launcher',
'ProcessCheck',
'APLCheck',
'ACLCheck',
]
TEST_CASE_DEVICE1 = [
'SettingsWifi',
'CrashCheck',
'Photos',
'Camera',
]
TEST_CASE_DEVICE2 = [
'NotificationBar',
# 'VideoTest',
'Note',
'Contacts',
'Mms',
'DistributedMusicPlayer'
]
def get_test_result(report_path):
try:
if not os.path.exists(report_path):
return False
rst = ResultReporter.get_task_info_params(report_path)
unsuccessful_params = rst.get('unsuccessful_params')
for case, step in unsuccessful_params.items():
if step:
return False
return True
except:
return False
def collect_test_result(report_path):
xml_report = os.path.join(report_path, 'summary_report.xml')
if not os.path.exists(xml_report):
return
timestamp = datetime.fromtimestamp(os.path.getmtime(xml_report))
test_date = timestamp.strftime('%Y-%m-%d')
try:
dom = parse(xml_report)
data = dom.documentElement
test_result = {
'用例名': [],
'测试结果': [],
'耗时': [],
'报错信息': [],
'报告路径': [],
}
testcases = data.getElementsByTagName('testsuite')
testcase_result = []
for t in testcases:
module_name = t.getAttribute('modulename')
result_kind = t.getAttribute('result_kind')
time = t.getAttribute('time')
testcase = t.getElementsByTagName('testcase')
message = testcase[0].getAttribute('message')
line = (module_name, result_kind, time, message, xml_report, test_date)
if line not in testcase_result:
testcase_result.append(line)
# csv
test_result['用例名'].append(module_name)
test_result['测试结果'].append(result_kind)
test_result['耗时'].append(time)
test_result['报错信息'].append(message)
test_result['报告路径'].append(xml_report)
df = pd.DataFrame(test_result)
with open('D:\\smoke_result_{}.csv'.format(test_date), 'a', newline='') as f:
df.to_csv(f, header=f.tell() == 0, index=False, mode='a')
except:
pass
if __name__ == '__main__':
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description='manual to this scription')
parser.add_argument('--config', type=str)
parser.add_argument('--test_num', type=str, default='1/1')
parser.add_argument('--tools_path', type=str)
parser.add_argument('--anwser_path', type=str)
parser.add_argument('--save_path', type=str)
parser.add_argument('--device_num', type=str)
parser.add_argument('--pr_url', type=str)
args = parser.parse_args()
new_cmd = 'run'
# 指定设备sn
if not args.device_num:
print("SmokeTest: End of check, test failed!")
sys.exit(98)
new_cmd += ' -sn {}'.format(args.device_num)
# 测试用例路径
tcpath = args.tools_path
new_cmd += ' -tcpath {}'.format(tcpath)
# 测试的设备编号1/1表示只有一台设备1/2表示第一台设备2/2表示第二台设备
if args.test_num == '1/1':
new_cmd += ' -l {}'.format(';'.join(TEST_CASE_MUST + TEST_CASE_DEVICE1 + TEST_CASE_DEVICE2))
elif args.test_num == '1/2':
new_cmd += ' -l {}'.format(';'.join(TEST_CASE_MUST + TEST_CASE_DEVICE1))
elif args.test_num == '2/2':
new_cmd += ' -l {}'.format(';'.join(TEST_CASE_MUST + TEST_CASE_DEVICE2))
# 指定报告生成路径
report_path = args.save_path
new_cmd += ' -rp {} -ta screenshot:true'.format(report_path)
# 测试资源路径
# respath = args.anwser_path
# new_cmd += ' -respath {}'.format(respath)
# shutil.rmtree(os.path.join(BASE_DIR, 'reports'), ignore_errors=True)
print('SmokeTest Begin >>>>>>>>>>>>')
main_process(new_cmd)
print('SmokeTest collect test result >>>>>>>>>>>>')
collect_test_result(report_path)
print('SmokeTest ending >>>>>>>>>>>>')
smoke_rst = get_test_result(report_path)
if smoke_rst:
print("SmokeTest: End of check, test succeeded!")
sys.exit(0)
print("SmokeTest: End of check, test failed!")
sys.exit(99)

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 148 KiB

View File

@ -0,0 +1,53 @@
####First_check_at_begin__Second_check_at_end__Two_results_must_be_same#####
com.ohos.launcher
render_service
####only_check_these_processes_are_exitst#####
hdf_devmgr
param_watcher
storage_manager
appspawn
hilogd
samgr
storage_daemon
uinput_inject
multimodalinput
huks_service
memmgrservice
bluetooth_servi
resource_schedu
bgtaskmgr_servi
audio_server
deviceauth_service
softbus_server
wifi_hal_service
faultloggerd
accountmgr
time_service
distributeddata
useriam
inputmethod_ser
ui_service
netmanager
sensors
media_service
wifi_manager_se
installs
hiview
telephony
camera_service
foundation
hdcd
light_host
vibrator_host
sensor_host
input_user_host
camera_host
audio_host
wifi_host
usb_host
blue_host
wifi_hal_service
com.ohos.systemui
device_usage_st
power_host

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["ACLCheck.py"]
}
}

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
import os.path
from acls_check.acl_check import acl_check_main
from devicetest.api import Asserts
from test_case import ITestCase
class ACLCheck(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
self.native_sa = os.path.join(os.path.dirname(self.testcases_path), 'acls_check', 'native_sa.log')
def setup(self):
self.step('ACL check start')
def process(self):
self.step('clear native_sa.log first')
# 先删除文件内容
if os.path.exists(self.native_sa):
self.step('{} exist, delete before test'.format(self.native_sa))
with open(self.native_sa, 'w') as f:
f.write('')
self.step('call acls_check.acl_check.py...')
acl_check_main(self.device_name)
self.step('{} exist?:{}'.format(self.native_sa, os.path.exists(self.native_sa)))
with open(self.native_sa, mode='r', encoding='utf-8', errors='ignore') as f:
f.seek(0)
acl_result = f.read()
Asserts.assert_not_in('ACL check failed', acl_result)
def teardown(self):
self.step('ACL check finish')

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["APLCheck.py"]
}
}

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
import os
from APL_compare_03.compare import apl_check_main
from devicetest.api import Asserts
from test_case import ITestCase
class APLCheck(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
self.apl_path = os.path.join(os.path.dirname(self.testcases_path), 'APL_compare_03', 'apl_compare.log')
def setup(self):
self.step('APL check start')
def process(self):
self.step('clear apl_compare.log first')
# 先删除文件内容
if os.path.exists(self.apl_path):
self.step('{} exist, delete before test'.format(self.apl_path))
with open(self.apl_path, 'w') as f:
f.write('')
self.step('call APL_compare_03.compare.py ...')
apl_check_main(self.device_name)
self.step('{} exist?:{}'.format(self.apl_path, os.path.exists(self.apl_path)))
with open(self.apl_path, mode='r', encoding='utf-8', errors='ignore') as f:
f.seek(0)
apl_result = f.read()
Asserts.assert_not_in('APL Check failed', apl_result)
def teardown(self):
self.step('APL check finish')

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Camera.py"]
}
}

View File

@ -0,0 +1,63 @@
from devicetest.api import Asserts
from test_case import ITestCase
class Camera(ITestCase):
camera_ability_name = 'com.ohos.camera.MainAbility'
camera_bundle_name = 'com.ohos.camera'
photo_ability_name = 'com.ohos.photos.MainAbility'
photo_bundle_name = 'com.ohos.photos'
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('预置条件1开始相机测试')
def process(self):
self.step('步骤1开启camera hilog')
self.common_oh.shell(self.Phone1, 'rm -rf /data/log/hilog/* && hilog -b X;hilog -b D -T CAMERA;hilog -r')
self.common_oh.wait(self.Phone1, 1)
self.step('步骤2启动camera app')
self.common_oh.startAbility(self.Phone1, self.camera_ability_name, self.camera_bundle_name)
self.common_oh.wait(self.Phone1, 5)
Asserts.assert_true(self.common_oh.isProcessRunning(self.Phone1, self.camera_bundle_name))
self.step('步骤3点击拍照')
self.common_oh.click(self.Phone1, 360, 1095, mode='NORMAL')
self.common_oh.wait(self.Phone1, 3)
self.step('步骤4切换到录像模式')
self.common_oh.click(self.Phone1, 430, 980, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.step('步骤5点击录制')
self.common_oh.click(self.Phone1, 360, 1095, mode='NORMAL')
self.common_oh.wait(self.Phone1, 3)
self.step('步骤6停止录制')
self.common_oh.click(self.Phone1, 320, 1095, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.step('步骤7点击左下角切到相册')
self.common_oh.click(self.Phone1, 200, 1095, mode='NORMAL')
self.common_oh.wait(self.Phone1, 11)
self.step('步骤8hilog打包')
self.common_oh.shell(self.Phone1, 'cd data/log/hilog/;hilog -x > camera_log.txt;hilog -b D')
self.common_oh.wait(self.Phone1, 1)
self.step('步骤9结果检查')
self.common_oh.shell(self.Phone1, 'cd /data/log/hilog && grep -nr PreviewOutputCallback')
self.common_oh.wait(self.Phone1, 1)
picture_name = 'camera.jpeg'
self.take_picture_to_local(picture_name)
self.step('步骤10检查相册应用是否拉起')
rst = self.common_oh.shell(self.Phone1, 'aa dump -a | grep {}'.format(self.photo_ability_name))
self.common_oh.wait(self.Phone1, 5)
Asserts.assert_in(self.photo_bundle_name, rst)
def teardown(self):
self.step('stop camera & photo app')
self.common_oh.forceStopAbility(self.Phone1, self.camera_bundle_name)
self.common_oh.cleanApplicationData(self.Phone1, self.camera_bundle_name)
self.common_oh.forceStopAbility(self.Phone1, self.photo_bundle_name)
# self.collect_hilog('camera_log.tar')
self.step('camera test finish')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Contacts.py"]
}
}

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class Contacts(ITestCase):
ability_name = 'com.ohos.contacts.MainAbility'
bundle_name = 'com.ohos.contacts'
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('contacts test start, start app')
self.common_oh.startAbility(self.Phone1, self.ability_name, self.bundle_name)
def process(self):
self.common_oh.wait(self.Phone1, 5)
# 控件检查
self.step('contacts 控件检查')
self.common_oh.checkIfTextExist(self.Phone1, '全部通话')
self.common_oh.checkIfTextExist(self.Phone1, '未接来电')
self.common_oh.checkIfTextExist(self.Phone1, '1')
self.common_oh.checkIfTextExist(self.Phone1, '5')
self.common_oh.checkIfTextExist(self.Phone1, '9')
self.common_oh.checkIfTextExist(self.Phone1, '联系人')
self.common_oh.checkIfTextExist(self.Phone1, '收藏')
self.step('contacts截图对比')
# 截图对比
contacts_pic = 'contacts.jpeg'
self.take_picture_to_local(contacts_pic)
self.crop_picture(contacts_pic)
similarity = self.compare_image_similarity(contacts_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(contacts_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def teardown(self):
self.common_oh.forceStopAbility(self.Phone1, self.bundle_name)
# self.collect_hilog('contacts.tar')
self.step('contacts test finish')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["CrashCheck.py"]
}
}

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
from devicetest.api import Asserts
from test_case import ITestCase
class CrashCheck(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
self.step('开始crash测试')
self.common_oh.remount(self.Phone1)
self.common_oh.wait(self.Phone1, 1)
def process(self):
self.step('获取crash信息')
crashes = self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/temp && grep "Process name" -rnw ./')
self.step('return: {}'.format(crashes))
self.common_oh.wait(self.Phone1, 1)
Asserts.assert_not_in('foundation', crashes)
Asserts.assert_not_in('render_service', crashes)
Asserts.assert_not_in('appspawn', crashes)
def teardown(self):
self.step('将crash文件压缩打包后回传到本地')
self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/temp && tar -cf crash_log.tar cppcrash*')
self.common_oh.pullFile(self.Phone1, '/data/log/faultlog/temp/crash_log.tar', self.local_save_path)
self.step('crash check结束')

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["DistributedMusicPlayer.py"]
}
}

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class DistributedMusicPlayer(ITestCase):
ability_name = 'ohos.samples.distributedmusicplayer.MainAbility'
bundle_name = 'ohos.samples.distributedmusicplayer'
def __init__(self, controllers):
super().__init__(controllers)
# self.tests = [
# 'shot_and_compare',
# ]
def setup(self):
super().setup()
self.step('预置条件DistributedMusicPlayer 测试开始, 启动app')
self.common_oh.startAbility(self.Phone1, self.ability_name, self.bundle_name)
def process(self):
self.common_oh.wait(self.Phone1, 5)
self.step('点击允许')
self.common_oh.wait(self.Phone1, 2)
try:
self.common_oh.touchByText(self.Phone1, '允许')
except:
pass
for i in range(3):
self.common_oh.click(self.Phone1, 540, 1050, mode='NORMAL')
self.step('控件检查')
# 控件检查
self.common_oh.checkIfTextExist(self.Phone1, 'dynamic.wav')
self.common_oh.checkIfKeyExist(self.Phone1, 'image1')
self.common_oh.checkIfKeyExist(self.Phone1, 'image2')
self.common_oh.checkIfKeyExist(self.Phone1, 'image3')
self.common_oh.checkIfKeyExist(self.Phone1, 'image4')
self.step('截图对比')
pic_name = 'distributedmusicplayer.jpeg'
self.take_picture_to_local(pic_name)
self.crop_picture(pic_name)
similarity = self.compare_image_similarity(pic_name)[0]
self.step('{}和标准图的相似度为{}%'.format(pic_name, similarity))
# 控件对比和截图对比有一个成功就认为pass
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def teardown(self):
self.common_oh.forceStopAbility(self.Phone1, self.bundle_name)
self.common_oh.cleanApplicationData(self.Phone1, self.bundle_name)
# self.collect_hilog('DistributedMusicPlayer.tar')
self.step('DistributedMusicPlayer测试结束')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Launcher.py"]
}
}

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
from devicetest.api import Asserts
from test_case import ITestCase
from xdevice import DeviceState
class Launcher(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
self.step('SmokeTest start')
super().setup()
def process(self):
normal = False
for retry in range(3):
self.common_oh.shell(self.Phone1, 'mkdir -p /data/local/tmp/screen_test/train_set')
self.step('{} 次唤醒设备({})'.format(retry, self.device_name))
self.common_oh.wake(self.Phone1)
self.common_oh.goHome(self.Phone1)
# 屏幕常亮
self.common_oh.shell(self.Phone1, 'power-shell setmode 602')
self.common_oh.wait(self.Phone1, 2)
# 收集hilog
self.collect_hilog('system_start_log_{}.tar'.format(self.device_name))
# 检查设备是否连接
assert self.Phone1.device_state == DeviceState.CONNECTED, AssertionError('device unconnected')
self.step('device connected')
# 检查屏幕点亮状态
self.check_power_state()
# 控件检查
try:
self.common_oh.checkIfTextExist(self.Phone1, '相机')
self.common_oh.checkIfTextExist(self.Phone1, '备忘录')
self.common_oh.checkIfTextExist(self.Phone1, 'SmartPerf')
self.common_oh.checkIfTextExist(self.Phone1, '计算器')
self.common_oh.checkIfTextExist(self.Phone1, '音乐')
self.common_oh.checkIfTextExist(self.Phone1, '时钟')
component_exist = True
except:
component_exist = False
# 截图对比
launcher_pic = 'launcher.jpeg'
self.take_picture_to_local(launcher_pic)
similarity = self.compare_image_similarity(launcher_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(launcher_pic, similarity))
cmp_rst = similarity >= 60
if component_exist and cmp_rst:
normal = True
break
else:
self.step('SmokeTest: launcher failed, reboot and try!!!')
self.common_oh.shell(self.Phone1, 'rm -rf /data/*;reboot')
self.common_oh.wait(self.Phone1, 50)
if not normal:
device_num = ''
self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/temp && tar -cf after_test_cppcrash{}.tar cppcrash*'.format(device_num))
self.common_oh.pullFile(self.Phone1, '/data/log/faultlog/temp/after_test_cppcrash{}.tar'.format(device_num), os.path.normpath(self.local_save_path))
# fault logger
self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/faultlogger && tar -cf after_test_jscrash{}.tar jscrash*'.format(device_num))
self.common_oh.pullFile(self.Phone1, '/data/log/faultlog/faultlogger/after_test_jscrash{}.tar'.format(device_num), os.path.normpath(self.local_save_path))
self.step('冒烟测试失败: 无法进系统或者进桌面主页检查出了问题!')
self.step('结束冒烟测试!')
Asserts.assert_true(normal)
def check_power_state(self):
power_state = self.common_oh.shell(self.Phone1, 'hidumper -s 3308')
self.common_oh.wait(self.Phone1, 2)
self.step('hidumper -s 3308 return: {}'.format(power_state))
Asserts.assert_true(('State=1' in power_state) or ('State=2' in power_state))
def teardown(self):
self.common_oh.shell(self.Phone1, 'cat /proc/`pidof foundation`/smaps_rollup')
self.step('Launcher finish')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Mms.py"]
}
}

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class Mms(ITestCase):
ability_name = 'com.ohos.mms.MainAbility'
bundle_name = 'com.ohos.mms'
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('MMS test start, start app')
self.common_oh.startAbility(self.Phone1, self.ability_name, self.bundle_name)
def process(self):
self.common_oh.wait(self.Phone1, 5)
self.step('控件检查')
# 控件检查
self.common_oh.checkIfTextExist(self.Phone1, '信息')
self.common_oh.checkIfTextExist(self.Phone1, '没有会话信息')
# 截图对比
self.step('mms 截图对比')
mms_pic = 'mms.jpeg'
self.take_picture_to_local(picture_name=mms_pic)
self.crop_picture(mms_pic)
similarity = self.compare_image_similarity(mms_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(mms_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def teardown(self):
self.common_oh.forceStopAbility(self.Phone1, self.bundle_name)
self.step('MMS test finish')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Note.py"]
}
}

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class Note(ITestCase):
ability_name = 'MainAbility'
bundle_name = 'com.ohos.note'
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
self.step('预置条件Note测试开始启动app')
super().setup()
self.common_oh.startAbility(self.Phone1, self.ability_name, self.bundle_name)
def process(self):
self.common_oh.wait(self.Phone1, 5)
for i in range(2):
self.step('步骤1{} 次点击允许'.format(i))
self.common_oh.click(self.Phone1, 530, 1100, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.step('步骤2点击数学公式')
self.common_oh.touchByText(self.Phone1, '数学公式', mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.step('步骤3点击屏幕弹出输入法')
self.common_oh.click(self.Phone1, 360, 280, mode='NORMAL')
self.common_oh.wait(self.Phone1, 3)
# 控件检查
self.step('步骤4控件检查')
self.common_oh.checkIfTextExist(self.Phone1, '数学公式')
self.common_oh.checkIfTextExist(self.Phone1, '未分类')
self.common_oh.checkIfTextExist(self.Phone1, 'resource:/RAWFILE/editor.html')
self.common_oh.checkIfTextExist(self.Phone1, 'space')
self.step('步骤5截图对比')
# 截图对比
note_pic = 'note.jpeg'
self.take_picture_to_local(note_pic)
self.crop_picture(note_pic)
similarity = self.compare_image_similarity(note_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(note_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def teardown(self):
self.step('收尾Note test finish')
self.step('收尾1点击home键')
self.common_oh.click(self.Phone1, 515, 1240, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.step('收尾2清理最近的任务')
self.common_oh.click(self.Phone1, 360, 1170, mode='NORMAL')
self.common_oh.wait(self.Phone1, 5)
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["NotificationBar.py"]
}
}

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class NotificationBar(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('预置条件Notification Bar测试开始')
def process(self):
for i in range(2):
self.step('步骤1{} 次下拉控制中心'.format(i))
self.common_oh.swipe(self.Phone1, x1=500, y1=0, x2=500, y2=80)
self.common_oh.wait(self.Phone1, 1)
self.common_oh.wait(self.Phone1, 5)
self.step('步骤2控制中心控件检查')
# 控件检查
self.common_oh.checkIfTextExist(self.Phone1, '控制中心')
self.common_oh.checkIfTextExist(self.Phone1, 'WLAN')
self.common_oh.checkIfTextExist(self.Phone1, '截屏')
self.common_oh.checkIfTextExist(self.Phone1, '位置信息')
self.common_oh.checkIfTextExist(self.Phone1, '飞行模式')
self.common_oh.checkIfTypeExist(self.Phone1, 'Slider')
# 截图对比
self.step('步骤3控制中心截图对比')
notification_pic = 'notification_bar.jpeg'
self.take_picture_to_local(notification_pic)
self.crop_picture(notification_pic)
similarity = self.compare_image_similarity(notification_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(notification_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def teardown(self):
self.step('收尾Notification Bar测试结束')
for i in range(2):
self.step('{} 次上滑收起控制中心'.format(i))
self.common_oh.swipe(self.Phone1, x1=500, y1=500, x2=500, y2=300)
self.common_oh.wait(self.Phone1, 1)
# self.collect_hilog('NotificationBar.tar')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["Photos.py"]
}
}

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
import os
from devicetest.api import Asserts
from test_case import ITestCase
class Photos(ITestCase):
photo_ability_name = 'com.ohos.photos.MainAbility'
photo_bundle_name = 'com.ohos.photos'
shot_ability_name = 'com.ohos.screenshot.ServiceExtAbility'
shot_bundle_name = 'com.ohos.screenshot'
def __init__(self, controllers):
super().__init__(controllers)
# self.tests = [
# 'pull_down_notification_bar',
# 'screenshot_x_y',
# 'start_photos',
# 'last_photos_x_y',
# 'shot_and_compare',
# 'process_and_sandbox_path_check',
# ]
def setup(self):
super().setup()
self.step('预置条件准备Photos测试')
def process(self):
self.step('步骤1下拉控制中心')
self.common_oh.swipe(self.Phone1, x1=500, y1=0, x2=500, y2=80)
self.common_oh.wait(self.Phone1, 2)
self.step('步骤2点击截屏快捷方式')
self.common_oh.click(self.Phone1, 115, 480, 'NORMAL')
self.common_oh.wait(self.Phone1, 5)
self.step('步骤3启动相册app')
self.common_oh.startAbility(self.Phone1, self.photo_ability_name, self.photo_bundle_name)
self.common_oh.wait(self.Phone1, 5)
self.step('步骤4点击最近')
self.common_oh.click(self.Phone1, 100, 220, 'NORMAL')
self.common_oh.wait(self.Phone1, 2)
# 控件检查
self.step('步骤5控件检查')
self.common_oh.checkIfKeyExist(self.Phone1, 'ActionBarButtonBack')
self.common_oh.checkIfKeyExist(self.Phone1, 'ToolBarButtonFavor')
self.common_oh.checkIfKeyExist(self.Phone1, 'ToolBarButtonDelete')
self.common_oh.checkIfTextExist(self.Phone1, '收藏')
self.common_oh.checkIfTextExist(self.Phone1, '删除')
# 截图对比
self.step('步骤6截图对比')
photos_pic = 'photos.jpeg'
self.take_picture_to_local(photos_pic)
self.crop_picture(photos_pic)
similarity = self.compare_image_similarity(photos_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(photos_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
self.step('步骤7medialibrarydata进程检查')
process = 'com.ohos.medialibrary.medialibrarydata'
Asserts.assert_true(self.common_oh.isProcessRunning(self.Phone1, process))
self.common_oh.wait(self.Phone1, 1)
# sandbox path检查
self.step('步骤7检查sandbox path')
pid_num = self.common_oh.shell(self.Phone1, 'pgrep -f {}'.format(process)).strip()
self.common_oh.wait(self.Phone1, 1)
sanboxf = self.common_oh.shell(self.Phone1, 'echo \"ls /storage/media/local/\"|nsenter -t {} -m sh'.format(pid_num))
self.common_oh.wait(self.Phone1, 1)
Asserts.assert_in('files', sanboxf)
def teardown(self):
self.step('stop photo app')
self.common_oh.forceStopAbility(self.Phone1, self.photo_bundle_name)
# self.collect_hilog('Photos.tar')
self.step('photo test finish')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["ProcessCheck.py"]
}
}

View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
import os
import sys
from devicetest.api import Asserts
from test_case import ITestCase
class ProcessCheck(ITestCase):
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('SmokeTest: ########## First check key processes start ##############')
def process(self):
Asserts.assert_true(os.path.exists(os.path.join(self.local_resource_path, 'process.txt')))
self.step('get process.txt content')
with open(os.path.join(self.local_resource_path, 'process.txt'), 'r+') as f:
text = f.read()
two_check_process_list = text.split('#####')[1].split()[0:-1]
other_process_list = text.split('#####')[2].split()
lose_process = []
for pname in two_check_process_list:
pids = self.common_oh.shell(self.Phone1, 'pidof {}'.format(pname))
try:
pidlist = pids.split()
for pid in pidlist:
int(pid)
self.step('{} pid is {}'.format(pname, pid))
except:
lose_process.append(pname)
self.common_oh.wait(self.Phone1, 1)
all_p = self.common_oh.shell(self.Phone1, 'ps -elf')
for pname in other_process_list:
if pname not in all_p:
lose_process.append(pname)
self.step('lose process is: {}'.format(lose_process))
if lose_process:
self.step('SmokeTest: error: {}, These processes do not exist!!!'.format(lose_process))
device_num = ''
self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/temp && tar -cf after_test_cppcrash{}.tar cppcrash*'.format(device_num))
self.common_oh.pullFile(self.Phone1, '/data/log/faultlog/temp/after_test_cppcrash{}.tar'.format(device_num), os.path.normpath(self.local_save_path))
# fault logger
self.common_oh.shell(self.Phone1, 'cd /data/log/faultlog/faultlogger && tar -cf after_test_jscrash{}.tar jscrash*'.format(device_num))
self.common_oh.pullFile(self.Phone1, '/data/log/faultlog/faultlogger/after_test_jscrash{}.tar'.format(device_num), os.path.normpath(self.local_save_path))
self.step('SmokeTest: SmokeTest find some key problems!')
self.step('SmokeTest: End of check, test failed!')
Asserts.assert_true(len(lose_process) == 0)
self.step('process check pass')
def teardown(self):
self.step('SmokeTest: first processes check is ok')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["SettingsWifi.py"]
}
}

View File

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
import json
import os
from devicetest.api import Asserts
from devicetest.aw.OpenHarmony import WifiHelper
from test_case import ITestCase
class SettingsWifi(ITestCase):
app_name = 'settings_wifi'
ability_name = 'com.ohos.settings.MainAbility'
bundle_name = 'com.ohos.settings'
def __init__(self, controllers):
super().__init__(controllers)
def setup(self):
super().setup()
self.step('开始wifi测试启动设置应用')
def process(self):
self.enter_setting_page()
self.enter_wlan_page()
self.connect_wifi()
def enter_setting_page(self):
self.common_oh.startAbility(self.Phone1, self.ability_name, self.bundle_name)
self.common_oh.wait(self.Phone1, 5)
self.step('控件检查')
# 控件检查
self.common_oh.checkIfTextExist(self.Phone1, '设置')
self.common_oh.checkIfTextExist(self.Phone1, '搜索设置项')
self.common_oh.checkIfTextExist(self.Phone1, 'WLAN')
self.common_oh.checkIfTextExist(self.Phone1, '声音')
self.common_oh.checkIfTextExist(self.Phone1, '应用')
# 截图对比
self.step('截图对比')
settings_pic = 'settings.jpeg'
self.take_picture_to_local(settings_pic)
self.crop_picture(settings_pic)
similarity = self.compare_image_similarity(settings_pic)[0]
self.step('{}和标准图的相似度为{}%'.format(settings_pic, similarity))
Asserts.assert_greater_equal(similarity, self.STANDARD_SIMILARITY)
def enter_wlan_page(self):
self.step('进入WLAN页面')
# 点击wlan
self.common_oh.touchByText(self.Phone1, 'WLAN', mode='NORMAL')
self.common_oh.wait(self.Phone1, 3)
# 打开wlan
toggle = self.common_oh.getWidgetProperties(self.Phone1, 'type/Toggle')
properties = json.loads(toggle)
if properties.get('checked') is False:
self.common_oh.touchByType(self.Phone1, 'Toggle')
self.common_oh.wait(self.Phone1, 5)
wlan_list_pic = 'wlan_list.jpeg'
self.take_picture_to_local(wlan_list_pic)
WifiHelper.checkWifiState(self.Phone1)
# assert WifiHelper.checkWifiState(self.Phone1), AssertionError('failed to turn on wifi')
# self.step('wifi has turned on')
def connect_wifi(self):
self.connect_by_click_point()
# 上面的靠坐标点击的方式容错率低直接采用Wifi模块
# self.connect_by_wifi_helper()
wifi_pic = 'wifi.jpeg'
self.take_picture_to_local(wifi_pic)
self.common_oh.wait(self.Phone1, 1)
wifi_cs_pic = 'wifi_connection_status.jpeg'
self.take_picture_to_local(wifi_cs_pic)
self.common_oh.wait(self.Phone1, 1)
def connect_by_wifi_helper(self):
pwd = 'passw0rd1!'
try:
# WifiHelper.connectWifi(self.Phone1, 'testapold', pwd)
WifiHelper.connectWifi(self.Phone1, 'testapold_Wi-Fi5', pwd)
except:
try:
WifiHelper.connectWifi(self.Phone1, 'testapold', pwd)
except:
pass
# WifiHelper.connectWifi(self.Phone1, 'testapold_Wi-Fi5', pwd)
self.common_oh.wait(self.Phone1, 20)
def connect_by_click_point(self):
self.step('点击待连接的wifi')
try:
self.common_oh.touchByText(self.Phone1, 'testapold', mode='NORMAL')
except:
try:
self.common_oh.touchByText(self.Phone1, 'testapold_Wi-Fi5', mode='NORMAL')
except:
pass
try:
self.step('点击密码输入框')
self.common_oh.wait(self.Phone1, 1)
self.common_oh.click(self.Phone1, 200, 200, mode='NORMAL')
self.common_oh.wait(self.Phone1, 1)
for i in range(3):
if self.common_oh.checkIfTextExist(self.Phone1, '?123', 'CONTAINS'):
# 双击切换到数字输入界面再切回来,使输入法为小写状态
self.common_oh.click(self.Phone1, 60, 1150, mode='DOUBLE')
break
elif self.common_oh.checkIfTextExist(self.Phone1, 'ABC', 'CONTAINS'):
# 双击切换到数字输入界面再切回来,使输入法为小写状态
self.common_oh.click(self.Phone1, 60, 1150, mode='NORMAL')
break
# 切换为大写
# self.common_oh.click(self.Phone1, 60, 1040, mode='DOUBLE')
# 密码: passw0rd1!
# P
self.common_oh.click(self.Phone1, 678, 800, mode='NORMAL')
# A
self.common_oh.click(self.Phone1, 80, 920, mode='NORMAL')
# S S
self.common_oh.click(self.Phone1, 150, 920, mode='NORMAL')
self.common_oh.click(self.Phone1, 150, 920, mode='NORMAL')
# W
self.common_oh.click(self.Phone1, 110, 800, mode='NORMAL')
# 切数字输入键盘
self.common_oh.click(self.Phone1, 60, 1150, mode='NORMAL')
# 0
self.common_oh.click(self.Phone1, 678, 800, mode='NORMAL')
# 切回字母输入界面
self.common_oh.click(self.Phone1, 60, 1150, mode='NORMAL')
# self.common_oh.click(self.Phone1, 60, 1040, mode='DOUBLE')
# R
self.common_oh.click(self.Phone1, 250, 800, mode='NORMAL')
# D
self.common_oh.click(self.Phone1, 220, 920, mode='NORMAL')
# 切数字输入键盘
self.common_oh.click(self.Phone1, 60, 1150, mode='NORMAL')
# 1
self.common_oh.click(self.Phone1, 30, 800, mode='NORMAL')
# !
self.common_oh.click(self.Phone1, 500, 1040, mode='NORMAL')
# 点击输入框右边的眼睛查看密码
self.common_oh.click(self.Phone1, 655, 200, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
self.take_picture_to_local('password.jpeg')
# 收起输入法
self.common_oh.click(self.Phone1, 675, 700, mode='NORMAL')
self.common_oh.wait(self.Phone1, 2)
# 点击连接
self.common_oh.touchByText(self.Phone1, '连接', mode='NORMAL')
self.common_oh.wait(self.Phone1, 25)
except:
self.step('SmokeTest: wifi list loading error!')
def teardown(self):
WifiHelper.closeWifi(self.Phone1)
self.common_oh.forceStopAbility(self.Phone1, self.bundle_name)
self.step('wifi test finish, stop setting app, go home')
# self.collect_hilog('SettingsWifi.tar')
super().teardown()

View File

@ -0,0 +1,13 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["VideoTest.py"]
}
}

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
import os.path
from devicetest.api import Asserts
from test_case import ITestCase
class VideoTest(ITestCase):
bundle_name = 'ohos.acts.multimedia.video.videoplayer'
def __init__(self, controllers):
super().__init__(controllers)
self.video_hap_path = os.path.join(self.local_resource_path, 'videotest', 'ActsVideoPlayerJsTest.hap')
self.mp4_path = os.path.join(self.local_resource_path, 'videotest', 'H264_AAC.mp4')
def setup(self):
super().setup()
self.step('预置条件VideoTest测试开始')
def process(self):
self.step('步骤1安装video hap')
self.common_oh.installApp_r(self.Phone1, self.video_hap_path)
self.common_oh.wait(self.Phone1, 2)
self.step('步骤2创建目录')
self.common_oh.shell(self.Phone1, 'mkdir -p /data/app/el2/100/base/{}/haps/entry/files'.format(self.bundle_name))
self.common_oh.wait(self.Phone1, 1)
self.step('步骤3remount')
self.common_oh.shell(self.Phone1, 'mount -o rw,remount')
self.common_oh.wait(self.Phone1, 1)
self.step('步骤4send file to device')
dev_path = '/data/app/el2/100/base/ohos.acts.multimedia.video.videoplayer/haps/entry/files/'
self.common_oh.pushFile(self.Phone1, self.mp4_path, dev_path)
self.common_oh.wait(self.Phone1, 1)
cmd = 'aa test -p {} -b {} -s unittest OpenHarmonyTestRunner -w 2000000 -s timeout 60000'.format(self.bundle_name, self.bundle_name)
result = self.common_oh.shell(self.Phone1, cmd)
self.common_oh.wait(self.Phone1, 5)
key_words = 'Failure: 0, Error: 0, Pass: 1'
Asserts.assert_true(key_words in result)
def teardown(self):
self.step('收尾停止Video app')
self.common_oh.forceStopAbility(self.Phone1, self.bundle_name)
# self.collect_hilog('video_log.tar')
self.step('VideoTest finish')
super().teardown()

View File

@ -0,0 +1,128 @@
import os
import cv2
import numpy
from PIL import Image
from devicetest.aw.OpenHarmony import CommonOH
from devicetest.controllers.cv import compare_image_similarity
from devicetest.core.test_case import TestCase, Step
class ITestCase(TestCase):
def __init__(self, controllers):
self.TAG = self.__class__.__name__
TestCase.__init__(self, self.TAG, controllers)
self.device_name = self.Phone1.device_sn
# path 相关
self.device_save_path = '/data/local/tmp/screen_test/'
self.testcases_path = os.path.dirname(__file__)
self.local_resource_path = os.path.join(os.path.dirname(self.testcases_path), 'resource')
self.local_save_path = self.cur_case.case_screenshot_dir
if not os.path.exists(self.local_save_path):
os.makedirs(self.local_save_path, exist_ok=True)
# framework utils
self.STANDARD_SIMILARITY = 60
self.common_oh = CommonOH
self.step = Step
self.step('testcase path is: {}'.format(self.testcases_path))
self.step('local resource path is: {}'.format(self.local_resource_path))
self.step('local save path is: {}'.format(self.local_save_path))
def setup(self):
self.common_oh.wake(self.Phone1)
self.common_oh.goHome(self.Phone1)
self.step('start log')
# self.common_oh.shell(self.Phone1, 'rm -rf /data/log/hilog/* && hilog -r && hilog -Q pidoff;hilog -G 512M;hilog -w start -l 400000000 -m none')
self.common_oh.wait(self.Phone1, 1)
def take_picture_to_local(self, picture_name):
"""
将图片从设备上传回本地
:param picture_name:
:return:
"""
self.step('delete history screen shot picture')
self.common_oh.removeFolderByCMD(self.Phone1, '{}*{}'.format(self.device_save_path, picture_name))
self.step('shot new picture')
self.common_oh.takePictureByCMD(self.Phone1, '{}{}_{}'.format(self.device_save_path, self.device_name, picture_name))
self.step('pull picture to local')
self.common_oh.pullFile(self.Phone1, '{}{}_{}'.format(self.device_save_path, self.device_name, picture_name), self.local_save_path)
self.common_oh.wait(self.Phone1, 2)
# def compare_image_similarity(self, picture_name, similar=0.95):
# """
# :param picture_name:
# :param similar:
# :return:
# """
# src_image_path = os.path.join(self.local_save_path, '{}_{}'.format(self.device_name, picture_name))
# target_image_path = os.path.join(self.local_resource_path, picture_name)
# self.step('compare picture: {}>>>'.format(picture_name))
# self.step('src image path({}) exist?{}'.format(src_image_path, os.path.exists(src_image_path)))
# self.step('target image path({}) exist?{}'.format(target_image_path, os.path.exists(target_image_path)))
# return compare_image_similarity(self.Phone1, src_image_path, target_image_path, similar)
def crop_picture(self, picture, crop_range=None):
"""
对图片进行裁剪
:param picture:待裁剪的图片路径
:param crop_range: 裁剪的尺寸[80, 1200, 0, 720] 表示纵向80~1200横向0~720的裁剪范围基本就是去掉上面的状态栏和下面的导航栏
:return:
"""
picture = os.path.join(self.local_save_path, '{}_{}'.format(self.device_name, picture))
if crop_range is None:
crop_range = [80, 1200, 0, 720]
img = cv2.imread(picture)
img = img[crop_range[0]: crop_range[1], crop_range[2]: crop_range[3]]
cv2.imwrite(picture, img)
def teardown(self):
self.common_oh.goHome(self.Phone1)
def collect_hilog(self, log_name):
self.step('stop hilog')
self.common_oh.shell(self.Phone1, 'hilog -w stop')
self.common_oh.wait(self.Phone1, 1)
self.step('compress hilog')
self.common_oh.shell(self.Phone1, 'cd /data/log/hilog && tar -cf {} *'.format(log_name))
self.common_oh.wait(self.Phone1, 1)
self.step('transfer {} from device to {}'.format(log_name, self.local_save_path))
self.common_oh.pullFile(self.Phone1, '/data/log/hilog/{}'.format(log_name), os.path.normpath(self.local_save_path))
self.common_oh.wait(self.Phone1, 1)
def compare_image_similarity(self, picture_name):
src_image_path = os.path.join(self.local_save_path, '{}_{}'.format(self.device_name, picture_name))
target_image_path = os.path.join(self.local_resource_path, picture_name)
self.step('compare picture: {}>>>'.format(picture_name))
self.step('src image path({}) exist?{}'.format(src_image_path, os.path.exists(src_image_path)))
self.step('target image path({}) exist?{}'.format(target_image_path, os.path.exists(target_image_path)))
size = (256, 256)
image1 = Image.open(src_image_path)
image2 = Image.open(target_image_path)
image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR)
image1 = cv2.resize(image1, size)
image2 = cv2.resize(image2, size)
sub_image1 = cv2.split(image1)
sub_image2 = cv2.split(image2)
sub_data = 0
for im1, im2 in zip(sub_image1, sub_image2):
sub_data += self.__calculate__(im1, im2)
sub_data = sub_data / 3
return sub_data * 100
def __calculate__(self, img1, img2):
image1 = cv2.cvtColor(numpy.asarray(img1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(img2), cv2.COLOR_RGB2BGR)
hist1 = cv2.calcHist([image1], [0], None, [256], [0.0, 255.0])
hist2 = cv2.calcHist([image2], [0], None, [256], [0.0, 255.0])
degree = 0
for i in range(len(hist1)):
if hist1[i] != hist2[i]:
degree = degree + (1 - abs(hist1[i] - hist2[i]) / max(hist1[i], hist2[i]))
else:
degree = degree + 1
degree = degree / len(hist1)
return degree