删除废弃代码

Signed-off-by: liyanlin02 <liyanlin15@huawei.com>
This commit is contained in:
liyanlin02 2024-03-18 16:35:18 +08:00
parent 266d36c775
commit 0c904d3469
31 changed files with 13 additions and 2235 deletions

View File

@ -1,47 +0,0 @@
[
{
"bundle&processName": "com.ohos.launcher",
"apl": "2"
},
{
"bundle&processName": "com.ohos.settings",
"apl": "2"
},
{
"bundle&processName": "com.ohos.systemui",
"apl": "2"
},
{
"bundle&processName": "com.ohos.screenlock",
"apl": "2"
},
{
"bundle&processName": "com.ohos.adminprovisioning",
"apl": "2"
},
{
"bundle&processName": "edm",
"apl": "3"
},
{
"bundle&processName": "com.ohos.settings.faceauth",
"apl": "2"
},
{
"bundle&processName": "cn.openharmony.inputmethodchoosedialog",
"apl":"3"
},
{
"bundle&processName":"media_service",
"apl":"3"
},
{
"bundle&processName":"com.ohos.amsdialog",
"apl":"3"
},
{
"bundle&processName":"com.ohos.useriam.authwidget",
"apl":"2"
}
]

View File

@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
# PATH='D:\\repo_test\\APL_compare_02\\'
# PATH=os.getcwd()+'/'
PATH = os.path.dirname(os.path.realpath(__file__)) + os.sep
# read_excel.py
'''
SHEET_NAMEexcel中的表名中英文都可
COLSexcel中的列号从0开始
SVNSVN的安装目录下/bin目录SVN在环境变量中的位置
SVN_URLexcel文件对应的url
USERsvn的用户名
PWDsvn的密码
FILE_PATH本地下载文件的路径
'''
SHEET_NAME="Sheet1"
COLS=[1,3]
SVN='D:/TortoiseSVN/bin'
SVN_URL='https://PMAIL_2140981.china.huawei.com/svn/test测试/01 目录/01_1 目录/APL基线标准v1.0.xlsx'
USER='hhhhs'
PWD='123456'
FILE_PATH=PATH+SVN_URL.split('/')[-1]
# read_device.py
'''
SQL_SRC设备上的数据库路径
SQL_DES本地下载文件路径
DOWNLOAD_DB从设备下载的hdc命令
QUERY_HAP_APL查询HAP APL的sql语句查询多列可以依次添加字段添加字段的顺序为比较时的字段优先级
QUERY_NATIVE_APL查Native APL的sql语句
'''
SQL_SRC=" /data/service/el1/public/access_token/access_token.db"
SQL_DES=PATH
DOWNLOAD_DB="hdc -t {} file recv"
QUERY_HAP_APL="select bundle_name,apl from hap_token_info_table"
QUERY_NATIVE_APL="select process_name,apl from native_token_info_table"
'''
APL_LOG_FILE执行脚本的日志信息
APL_RECORD_PATHAPL对比记录的日志信息
IS_OVERWRITE是否覆盖之前的APL日志w表示覆盖a表示追加
'''
APL_LOG_FILE=PATH+'apl_compare.log'
APL_RECORD_PATH=PATH+'apl_record.txt'
IS_OVERWRITE='w'

View File

@ -1,95 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
import math
import enum
import time
import logging
import threading
from apl_config import *
log_tag = 'common'
apl_file_log = logging.FileHandler(filename=APL_LOG_FILE, mode='a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt='%Y-%m-%d %H:%M:%S %a')
apl_file_log.setFormatter(fmt)
# 定义日志
apl_logger = logging.Logger(name = 'apl_compare_log', level=logging.INFO)
apl_logger.addHandler(apl_file_log)
class ErrorType(enum.Enum):
not_in_apl_table = 1
apl_is_invalid = 2
class ApiLevel(enum.Enum):
normal = 1
system_basic = 2
system_core = 3
class LogLevel(enum.Enum):
Error = 1
Info = 2
class AplCompareException(Exception):
def __init__(self, msg):
self.msg = msg
class AplCompareThread(threading.Thread):
def __init__(self, func, args=()):
super(AplCompareThread, self).__init__()
self.func = func
self.args = args
self.result = None
def run(self):
self.result = self.func(*self.args)
def get_result(self):
threading.Thread.join(self)
try:
return self.result
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.args[0]))
return None
def apl_log(msg):
# 写日志
apl_logger.info(msg)
def apl_set_log_content(level, tag, msg):
log_content = timestamp() + ' {}'.format(level) + ' [{}]'.format(tag) + ' {}'.format(msg)
print(log_content)
apl_log(log_content)
return(log_content)
def set_error_record(name,error):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())+' %(name)-50s: %(error)-50s\n'%{'name':name,'error':error}
def set_map(results):
if results == None:
return None
res_map = {}
for result in results:
res_map[result[0]] = set_value(result[1:])
return res_map
def set_value(result):
value = []
for res in result:
if math.isnan(res):
res = 0
value.append(res)
return value
def timestamp():
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())

View File

@ -1,200 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
import time
import sys
import os
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from read_device import *
from read_excel import *
from apl_config import *
def whitelist_check(apl, value, fields_from_whitelist):
# True 包含在白名单内
check = value in fields_from_whitelist.keys()
is_pass = False
if check and str(apl) == fields_from_whitelist[value]:
is_pass = True
return is_pass
def compare_hap_apl(fields_from_device, fields_from_whitelist):
records = []
log_tag = 'compare_hap_apl'
hap_check = True
for value in fields_from_device:
apl = fields_from_device[value][0]
if apl > 1:
is_pass = whitelist_check(apl, value, fields_from_whitelist)
info = 'bundleName = {} apl = {}'.format(value, str(apl))
if is_pass == False:
hap_check = False
# info = value
# info = 'bundleName = {} apl = {}'.format(value, str(apl))
log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
records.append(log_content)
else:
apl_set_log_content(LogLevel(2).name, log_tag, info)
return records, hap_check
def compare_native_apl(fields_from_device, fields_from_whitelist):
records = []
log_tag = 'compare_native_apl'
native_check = True
for value in fields_from_device:
apl = fields_from_device[value][0]
if apl > 2:
info = 'processName = {} apl = {}'.format(value, str(apl))
is_pass = whitelist_check(apl, value, fields_from_whitelist)
if is_pass == False:
native_check = False
log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
records.append(log_content)
else:
apl_set_log_content(LogLevel(2).name, log_tag, info)
return records, native_check
def fields_compare_write_once(fields_from_device,fields_from_excel):
records=[]
for bundle_name in fields_from_device.keys():
if bundle_name not in fields_from_excel.keys():
record=(bundle_name,ErrorType(1).name)
records.append(record)
continue
fields=fields_from_device[bundle_name]
standard_fields=fields_from_excel[bundle_name]
if not isInvalid(fields,standard_fields):
record=(bundle_name,ErrorType(2).name)
records.append(record)
print('Compare successful!')
return records
def isInvalid(fields,standard_fields):
if len(fields) == 1:
return fields[0] <= standard_fields[0]
for field, standard_field in fields, standard_fields:
if field>standard_field:
return False
return True
def write_record(name,error):
try:
file = open(APL_RECORD_PATH,'a')
err_record = set_error_record(name, error)
file.write(err_record)
file.close()
except Exception as e:
log_content=apl_set_log_content(str(s))
apl_log(log_content)
def write_record_once(err_records,is_overwrite):
try:
file=open(APL_RECORD_PATH,is_overwrite)
for record in err_records:
err_record = set_error_record(record[0],record[1])
file.write(err_record)
file.close()
except Exception as e:
log_content=apl_set_log_content(str(e))
apl_log(log_content)
def excel_thread():
try:
# settings={
# ' svn': SVN,
# 'url': url_encode(SVN_URL),
# 'user': USER,
# 'pwd': PWD,
# 'dir': FILE_PATH,
# }
# excel_file = FILE_PATH #svn_checkout(settings)
log_tag = 'excel_thread'
# if excel_file == None:
# apl_set_log_content(LogLevel(2).name, log_tag, 'svn_checkoutc failed') #raise
# apl_from_excel = read_excel(excel_file, sheet = SHEET_NAME, cols = COLS)
# path = PATH + 'APL基线标准v1.0.json'
path = PATH + 'temp.json'
apl_from_json = read_json(path)
return apl_from_json
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, 'excel_thread catch error: {}'.format(e.args[0]))
return None
def sql_thread(sn, sn2):
try:
print(DOWNLOAD_DB.format(sn)+' ' + SQL_SRC + ' ' + SQL_DES)
print()
log_tag = 'sql_thread'
sql_file = download_from_device(DOWNLOAD_DB.format(sn), SQL_SRC, SQL_DES)
if sql_file == None:
raise
query_hap_apl_thread = AplCompareThread(query_hap_apl, (sql_file, QUERY_HAP_APL))
query_native_apl_thread = AplCompareThread(query_native_apl, (sql_file, QUERY_NATIVE_APL))
query_hap_apl_thread.start()
query_native_apl_thread.start()
query_native_apl_thread.join()
query_native_apl_thread.join()
hap_apl_map = query_hap_apl_thread.get_result()
native_apl_map = query_native_apl_thread.get_result()
return hap_apl_map, native_apl_map
except:
apl_set_log_content(LogLevel(1).name, log_tag, 'download_from_device failed')
return None,None
def apl_check_main(sn):
try:
log_tag = 'Main'
apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check Begin!--------')
excel_thr = AplCompareThread(excel_thread)
sql_thr = AplCompareThread(sql_thread, (sn, sn))
excel_thr.start()
sql_thr.start()
excel_thr.join()
sql_thr.join()
apl_from_excel = excel_thr.get_result()
hap_apl_map, native_apl_map = sql_thr.get_result()
if apl_from_excel == None or hap_apl_map == None or native_apl_map == None:
raise
hap_results, hap_check = compare_hap_apl(hap_apl_map, apl_from_excel)
native_results, native_check = compare_native_apl(native_apl_map, apl_from_excel)
write_record_once(hap_results, IS_OVERWRITE)
write_record_once(native_results, 'a')
if native_check == False or hap_check == False:
apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = {}, native = {}] --------'.format(hap_check, native_check))
apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check End! --------')
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = False, native = False] --------')
apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.args[0]))
if __name__ == '__main__':
try:
sn = sys.argv[1]
except:
sn_list = []
result = os.popen('hdc list targets')
res = result.read()
for line in res.splitlines():
sn_list.append(line)
sn = sn_list[0]
apl_check_main(sn)

View File

@ -1,83 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
from subprocess import run
import os
import sqlite3
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from common import *
from apl_config import *
log_tag = 'read_device'
#从设备中导出数据库
def download_from_device(cmd,sql_src,sql_des):
download_cmd=cmd+' '+sql_src+' '+sql_des
apl_set_log_content(LogLevel(2).name, log_tag, 'database start downloading!')
try:
result = os.popen(download_cmd)
stdout = result.read()
print(stdout)
if 'Fail' in stdout:
raise AplCompareException(stdout.replace('\n\n','').replace('[Fail]', ''))
#sql_file=sql_des+'\\'+sql_src.split('/').pop()
sql_file = sql_des+sql_src.split('/').pop()
apl_set_log_content(LogLevel(2).name, log_tag, '{} download successful!'.format(sql_file))
return sql_file
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
def sql_connect(db):
try:
if not os.path.exists(db):
raise AplCompareException('{} is not exists!'.format(db))
conn = sqlite3.connect(db)
return conn
except AplCompareException as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
#数据库语句查询
def query_records(db,sql):
log_content = ''
try:
conn = sql_connect(db)
if conn == None:
raise AplCompareException('{} cannot connect!'.format(db))
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
conn.close()
apl_set_log_content(LogLevel(2).name, log_tag, '"{}" query successful!'.format(sql))
return results
except sqlite3.OperationalError as e:
apl_set_log_content(LogLevel(2).name, log_tag, 'database {}'.format(e.args[0]))
return None
except AplCompareException as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
#查询hap_token_info_table中的bundle_name和apl
def query_hap_apl(db,sql):
results = query_records(db, sql)
return set_map(results)
#查询native_token_info_table中的process_name和apl
def query_native_apl(db,sql):
results = query_records(db, sql)
return set_map(results)

View File

@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python3
import subprocess
import pandas as pd
import urllib.parse
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from common import *
from apl_config import *
import json
log_tag = 'read_whitelist'
# 全部文件夹检出本地已经安装svn
def svn_checkout(settings):
try:
print(settings['url'])
print(settings['dir'])
os.chdir(settings['svn'])
cmd = 'svn export --force %(url)s %(dir)s --username %(user)s --password %(pwd)s'%settings
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
print(stderr)
if stderr != b'':
raise AplCompareException(str(stderr,'utf-8').replace('\r\n','\t'))
apl_set_log_content(LogLevel(2).name, log_tag, '{} export successful!'.format(settings['dir']))
return settings['dir']
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.msg))
return None
#url编码
def url_encode(url):
partions=url.split("/",3)
encode_url=partions[0]
partions[-1]=urllib.parse.quote(partions[-1])
for partion in partions[1:]:
encode_url=encode_url+'/'+partion
return encode_url
def read_excel(file, sheet, cols):
try:
df = pd.read_excel(file, sheet_name = sheet, usecols = cols)
data_list = df.values.tolist()
apl_map = set_map(data_list)
apl_set_log_content(LogLevel(2).name, log_tag, '{} read successful!'.format(file))
return apl_map
except (ValueError,FileNotFoundError) as e:
apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.msg))
return None
def read_json(path):
try:
with open(path, 'r') as f:
file = f.read()
data_list = json.loads(file)
res_dict = set_dict(data_list)
return res_dict
except Exception as e:
apl_set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
return None
def set_dict(data_list: list()):
res_dict = {}
for res in data_list:
res_dict[res['bundle&processName']] = res['apl']
return res_dict

View File

@ -1,121 +0,0 @@
## 版本
python版本3.8.10
pip版本:22.1.2
python依赖
```
pip install pandas
pip install openpyxl
pip install subprocess
```
## 使用
`python compare.py`
## 目录
```
APL_compare
├── apl_config.py # 整个目录中的常量定义
├── read_device.py # 从设备中下载db并解析表和字段的函数
├── read_excel.py # 从excel中解析表和字段的函数
├── compare.py # 脚本运行入口
└── common.py # 公共需要用到的函数
```
## apl_config.py
常量定义
`PATH`:当前目录的地址
### read_excel.py
`SHEET_NAME`excel中的表名
`COLS`excel中的列名下标从0开始
`SVN`SVN的安装目录下的bin目录
`SVN_URL`excel文件在SVN上对应的url
`USER`svn的用户名
`PWD`svn的密码
`FILE_PATH`:本地下载文件的路径
`SQL_SRC`:设备上的数据库路径
`SQL_DES`:本地下载文件路径
`DOWNLOAD_DB`从设备下载的hdc命令
`QUERY_HAP_APL`查询HAP APL的sql语句查询多列可以依次添加字段添加字段的顺序为比较时的字段优先级
`QUERY_NATIVE_APL`查Native APL的sql语句
`APL_LOG_FILE`:执行脚本的日志路径
`APL_RECORD_PATH`APL对比记录的日志路径
`IS_OVERWRITE`是否覆盖之前的APL日志w表示覆盖a表示追加
## read_device.py
用于从设备上导出数据库,并解析表和字段
### 数据库导出
函数:`download_from_device(cmd,sql_src,sql_des)`
hdc命令`cmd`
设备中数据库路径:`sql_src`
本地数据库路径:`sql_des`
执行命令:`hdc file recv sql_src sql_des`
### 连接数据库
相关函数:`sql_connect(db)`
传入参数:`db`--db文件存放路径
返回结果:`conn`--数据库的连接
### sql语句查询
相关函数:`query_records(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`sql查询语句
返回结果:`results`--查询结果
### 查hap_token_info_table中的bundle_name和apl
sql语句`QUERY_HAP_APL="select bundle_name,apl from hap_token_info_table"`
相关函数:`query_hap_apl(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`sql查询语句
返回结果:`res_map`--查询结果转化为的字典mapkey是bundle_namevalue是apl
### 查询native_token_info_table中的process_name和apl
sql语句`QUERY_NATIVE_APL="select process_name,apl from native_token_info_table"`
相关函数:`query_native_apl(db,sql)`
传入参数:`db`--需要连接的数据库;`sql`--sql查询语句
返回结果:`res_map`--查询结果转化为的字典mapkey是process_namevalue是apl
## read_excel.py
### 从svn上下载excel
相关函数:`syn_checkout(settings)`
传入参数:`settings`--包含svn上文件路径本地路径用户名密码
返回结果:`settings['dir']`--本地下载路径
### url编码
相关函数:`url_encode(url)`
传入参数:`url`
返回结果:`encode_url`
### 解析excel
相关函数:`read_excel(file,sheet,cols)`
传入参数:`file`--excel文件`sheet`--表名,`cols`--列名
返回结果:`apl_map`----查询结果转化为的字典mapkey是bundle/process_namevalue是apl
## common.py
### 脚本执行过程中的错误日志
相关函数:`log(msg)`
相关参数:`msg`--错误信息
### 设置脚本执行过程中的日志信息
相关函数:`apl_set_log_content(msg)`
相关参数:`msg`--日志信息,`is_error`--用于判断是执行失败、成功
返回结果:带时间戳的日志信息
### 设置apl记录的格式
相关函数set_error_record(name,error)
相关参数:`name`--bundle name或者native name`error`--错误原因
返回结果:带时间戳的记录
### 将查询结果转化成map的结构
相关函数:`set_map(results)`
传入参数:`results`--查询结果的列表
返回结果:`res_map`
### 转换查询结果map的value格式
相关函数:`set_value(result)`
传入参数:`result`--查询到的每一行结果
返回结果:`value`--包含查询到的字段的列表
### 时间戳
相关函数:`timestamp()`
返回结果:时间戳
### 错误类型
`ErrorType`:枚举类
### 自定义异常
`AplCompareException`
### 自定义线程
`AplCompareThread`
### 日志格式设置
`logging.basicConfig`

View File

@ -1,86 +0,0 @@
[
{
"bundle&processName": "com.ohos.launcher",
"apl": "2"
},
{
"bundle&processName": "com.ohos.settings",
"apl": "2"
},
{
"bundle&processName": "com.ohos.systemui",
"apl": "2"
},
{
"bundle&processName": "com.ohos.screenlock",
"apl": "2"
},
{
"bundle&processName": "com.ohos.adminprovisioning",
"apl": "2"
},
{
"bundle&processName": "edm",
"apl": "3"
},
{
"bundle&processName": "com.ohos.settings.faceauth",
"apl": "2"
},
{
"bundle&processName": "cn.openharmony.inputmethodchoosedialog",
"apl":"3"
},
{
"bundle&processName":"media_service",
"apl":"3"
},
{
"bundle&processName":"com.ohos.amsdialog",
"apl":"3"
},
{
"bundle&processName":"com.ohos.useriam.authwidget",
"apl":"2"
},
{
"bundle&processName":"com.ohos.powerdialog",
"apl":"2"
},
{
"bundle&processName":"com.ohos.filepicker",
"apl":"2"
},
{
"bundle&processName":"com.ohos.camera",
"apl":"3"
},
{
"bundle&processName":"com.ohos.smartperf",
"apl":"2"
},
{
"bundle&processName":"com.ohos.devicemanagerui",
"apl":"2"
},
{
"bundle&processName":"ohos.telephony.resources",
"apl":"3"
},
{
"bundle&processName":"com.ohos.notificationdialog",
"apl":"2"
},
{
"bundle&processName":"ohos.samples.distributedcalc",
"apl":"3"
},
{
"bundle&processName":"ohos.samples.distributedmusicplayer",
"apl":"3"
},
{
"bundle&processName":"com.ohos.note",
"apl":"3"
}
]

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from resolve_token_info import *
from read_acl_whitelist import *
log_tag = 'acl_check'
def whitelist_check(whitelist, acls):
try:
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'-------------------------- Trustlist Verification begin --------------------------')
check_pass = True
for k, v in acls.items():
if k in whitelist.keys():
temp = whitelist[k]
for acl in v:
if acl not in temp:
check_pass = False
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'precessName = {} the acl = {} trustlist is not configured.'.format(k, acl))
else:
check_pass = False
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check', 'precessName = {} the acls = {} trustlist is not configured.'.format(k, v))
if check_pass == False:
raise AclCheckException(
'-------------------------- Trustlist Verification failed --------------------------')
else:
set_log_content(LogLevel(2).name, log_tag + '->whitelist_check',
'-------------------------- Trustlist Verification successful --------------------------')
except Exception as e:
set_log_content(LogLevel(1).name, log_tag + '->whitelist_check', e.msg)
raise
def main(sn):
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- ACL check begin --------------------------')
try:
hdc_command(GENERATING_TOKEN_INFO_COMMAND.format(sn, TOKEN_INFO_URL.format(sn)))
hdc_command(DOWNLOAD_TOKEN_INFO_COMMAND.format(sn, TOKEN_INFO_URL.format(sn), DOWNLOAD_TOKEN_INFO_URL.format(sn)))
hdc_command(CLEAR_TOKEN_INFO_FILE.format(sn, TOKEN_INFO_URL.format(sn)))
file = read_txt(DOWNLOAD_TOKEN_INFO_URL.format(sn))
# clear_token_info_txt(DOWNLOAD_TOKEN_INFO_URL.format(sn))
acls_dict = check_and_get(file)
acl_whitelist = read_json(PATH + 'acl_whitelist.json')
whitelist = get_acl_dict(acl_whitelist)
whitelist_check(whitelist, acls_dict)
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
set_log_content(LogLevel(1).name, log_tag,
'-------------------------- ACL check failed --------------------------')
finally:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- ACL check end --------------------------')
if __name__ == '__main__':
sn = sys.argv[1]
main(sn)

View File

@ -1,180 +0,0 @@
[
{
"processName": "hiview",
"acls": [
"ohos.permission.DUMP"
]
},
{
"processName": "privacy_service",
"acls": [
"ohos.permission.MANAGE_DISPOSED_APP_STATUS"
]
},
{
"processName": "inputmethod_service",
"acls": [
"ohos.permission.INPUT_MONITORING"
]
},
{
"processName": "memmgrservice",
"acls": [
"ohos.permission.INTERACT_ACROSS_LOCAL_ACCOUNTS_EXTENSION"
]
},
{
"processName": "locationhub",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "useriam",
"acls": [
"ohos.permission.ACCESS_AUTH_RESPOOL",
"ohos.permission.INTERACT_ACROSS_LOCAL_ACCOUNTS_EXTENSION"
]
},
{
"processName": "pinauth",
"acls": [
"ohos.permission.ACCESS_AUTH_RESPOOL"
]
},
{
"processName": "foundation",
"acls": [
"ohos.permission.PUBLISH_SYSTEM_COMMON_EVENT",
"ohos.permission.PERMISSION_START_ABILITIES_FROM_BACKGROUND",
"ohos.permission.GRANT_SENSITIVE_PERMISSIONS",
"ohos.permission.REVOKE_SENSITIVE_PERMISSIONS",
"ohos.permission.MANAGE_HAP_TOKENID",
"ohos.permission.START_INVISIBLE_ABILITY",
"ohos.permission.INPUT_MONITORING",
"ohos.permission.INSTALL_SANDBOX_BUNDLE",
"ohos.permission.INTERACT_ACROSS_LOCAL_ACCOUNTS_EXTENSION",
"ohos.permission.MANAGE_USER_ACCOUNT_INFO"
]
},
{
"processName": "dscreen",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
},
{
"processName": "sensors",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "camera_service",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "audio_server",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "msdp_sa",
"acls": [
"ohos.permission.INPUT_MONITORING",
"ohos.permission.ACCESS_DISTRIBUTED_HARDWARE",
"ohos.permission.INTERCEPT_INPUT_EVENT"
]
},
{
"processName": "dslm_service",
"acls": [
"ohos.permission.ACCESS_IDS"
]
},
{
"processName": "accountmgr",
"acls": [
"ohos.permission.ENFORCE_USER_IDM",
"ohos.permission.STORAGE_MANAGER_CRYPT"
]
},
{
"processName": "hdcd",
"acls": [
"ohos.permission.GET_BUNDLE_INFO_PRIVILEGED",
"ohos.permission.INSTALL_BUNDLE",
"ohos.permission.LISTEN_BUNDLE_CHANGE",
"ohos.permission.CHANGE_ABILITY_ENABLED_STATE",
"ohos.permission.REMOVE_CACHE_FILES",
"ohos.permission.START_ABILITIES_FROM_BACKGROUND",
"ohos.permission.PERMISSION_USED_STATS",
"ohos.permission.DUMP",
"ohos.permission.NOTIFICATION_CONTROLLER",
"ohos.permission.PUBLISH_SYSTEM_COMMON_EVENT",
"ohos.permission.CLEAN_APPLICATION_DATA",
"ohos.permission.START_SYSTEM_DIALOG"
]
},
{
"processName": "softbus_server",
"acls": [
"ohos.permission.GET_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "backup_sa",
"acls": [
"ohos.permission.INSTALL_BUNDLE"
]
},
{
"processName": "media_service",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
},
{
"processName": "security_component_service",
"acls": [
"ohos.permission.GRANT_SENSITIVE_PERMISSIONS",
"ohos.permission.REVOKE_SENSITIVE_PERMISSIONS"
]
},
{
"processName": "distributedsched",
"acls": [
"ohos.permission.INPUT_MONITORING",
"ohos.permission.MANAGE_MISSIONS"
]
},
{
"processName": "accessibility",
"acls": [
"ohos.permission.INTERCEPT_INPUT_EVENT"
]
},
{
"processName": "dlp_permission_service",
"acls": [
"ohos.permission.INSTALL_SANDBOX_BUNDLE",
"ohos.permission.UNINSTALL_SANDBOX_BUNDLE"
]
},
{
"processName": "quick_fix",
"acls": [
"ohos.permission.INSTALL_QUICK_FIX_BUNDLE",
"ohos.permission.UNINSTALL_QUICK_FIX_BUNDLE"
]
},
{
"processName": "sharing_service",
"acls": [
"ohos.permission.CAPTURE_SCREEN"
]
}
]

View File

@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import platform
import time
# 系统分隔符
SYS_SEQ = os.sep
# 系统平台
SYS_PLATFORM = platform.system()
PATH = os.path.dirname(os.path.realpath(__file__)) + SYS_SEQ
# 脚本的执行日志
LOG_FILE = PATH + SYS_SEQ + "native_sa.log"
# 设备上生成的token info 文件名
TOKEN_INFO_NAME = 'token_info_'+ str(time.time_ns()) +'_{}.txt'
# 设备上生成文件存放位置
TOKEN_INFO_URL = '/data/{}'.format(TOKEN_INFO_NAME)
# 设备上文件生成命令
GENERATING_TOKEN_INFO_COMMAND = 'hdc -t {} shell atm dump -t > {}'
# 下载token info 文件存放路径
DOWNLOAD_TOKEN_INFO_URL = PATH + TOKEN_INFO_NAME
# 文件下载命令
DOWNLOAD_TOKEN_INFO_COMMAND = 'hdc -t {} file recv {} {}'
# 删除设备上的文件命令
CLEAR_TOKEN_INFO_FILE = 'hdc -t {} shell rm -rf {}'

View File

@ -1,49 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
import json
from utils import *
log_tag = 'read_acl_whitelist'
def read_json(path):
set_log_content(LogLevel(2).name, log_tag, 'read {}'.format(path))
if not os.path.exists(path):
set_log_content(LogLevel(2).name, log_tag, '{} file not exits'.format(path))
raise AclCheckException('{} file not exits'.format(path))
try:
with open(path, 'r') as f:
file = f.read()
return file
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException('{} failed to read the file.'.format(path))
def get_acl_dict(file):
try:
acls_dict = {}
f = json.loads(file)
for it in f:
key = it.get('processName')
values = it.get('acls')
acls_dict[key] = values
return acls_dict
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, '{}'.format(e.msg))
raise

View File

@ -1,72 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from utils import *
log_tag = 'resolve_token_info'
def check_and_get(file: list):
nativeAcls = {}
try:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- invalidPermList check begin --------------------------')
check_pass = True
processName = 'xxxxxxxx'
for it in file:
if it.find('processName') != -1:
processName = it.replace(',', '').split(':')[1].split('"')[1]
elif it.find('invalidPermList') != -1:
check_pass = False
msg = 'invalidPermList information is detected in processName = {}'.format(processName)
set_log_content(LogLevel(2).name, log_tag, msg)
elif check_pass and it.find('nativeAcls') != -1:
bb = it.split(':')
if bb[1].split('"')[1].__len__() == 0:
continue
permissionNameList = bb[1].split('"')[1].split(',')
nativeAcls[processName] = permissionNameList
if check_pass == False:
raise AclCheckException('-------------------------- The invalidPermList check failed --------------------------')
else:
set_log_content(LogLevel(2).name, log_tag,
'-------------------------- The invalidPermList check successful --------------------------')
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise
return nativeAcls
def clear_token_info_txt(path):
try:
os.remove(path)
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
def read_txt(path):
set_log_content(LogLevel(2).name, log_tag, 'read {}'.format(path))
if not os.path.exists(path):
set_log_content(LogLevel(2).name, log_tag, '{} file not exits'.format(path))
raise AclCheckException('{} file not exits!'.format(path))
try:
with open(path, 'r') as f:
file = f.readlines()
return file
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException('{} failed to read the file.'.format(path))

View File

@ -1,71 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
import os
import sys
from subprocess import Popen, PIPE, STDOUT
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
from config import *
log_tag = 'utils'
class AclCheckException(Exception):
def __init__(self, msg):
self.msg = msg
def timestamp():
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
class LogLevel(enum.Enum):
Error = 1
Info = 2
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S %a')
def log(msg):
logging.error(msg)
def set_log_content(level, tag, msg):
log_content = timestamp() + ' {}'.format(level) + ' [{}]'.format(tag) + ' {}'.format(msg)
print(log_content)
log(log_content)
return (log_content)
def shell_command(command_list: list):
try:
print(command_list)
process = Popen(command_list, stdout=PIPE, stderr=STDOUT)
exitcode = process.wait()
set_log_content(LogLevel(2).name, log_tag, '{} operation fuccessful!'.format(command_list))
return process, exitcode
except Exception as e:
set_log_content(LogLevel(1).name, log_tag, e.msg)
raise AclCheckException(e.msg)
def hdc_command(command):
print(command)
command_list = command.split(' ')
_, exitcode = shell_command(command_list)
return exitcode

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

View File

@ -1,115 +0,0 @@
[
{
"DEVICE_1":[5, 6, 7, 8, 9],
"DEVICE_2":[1, 2, 3, 4],
"recent-x-y":[513, 1244],
"recent_del-x-y":[360, 1170],
"permisson_ok-x-y":[530, 1100],
"wlan-x-y":[300, 300],
"wlan_button-x-y":[640, 200],
"note_content-x-y":[500, 310],
"kill_keyboard-x-y":[690, 690],
"take_photos-x-y":[360, 1095],
"convert_to_video-x-y":[430, 980],
"convert_to_photos-x-y":[200, 1095],
"last_photos-x-y":[100, 220],
"stop_video-x-y":[320, 1095],
"phone-x-y":[645, 1060],
"screenshot-x-y":[115,480],
"remount":["mount -o rw,remount"],
"stop_hilog":["hilog -w stop"],
"cmp_cmd-level":["", 65],
"get_file_from_dev":[""],
"send_file_to_dev":["", ""]
},
{
"app_name": "settings_wifi",
"entry": "aa start -a com.ohos.settings.MainAbility -b com.ohos.settings",
"crop_range":[140, 450, 30, 500],
"kill_settings": ["killall com.ohos.settings"],
"all_actions": [
[1, "shot_cmd", "settings"], [1, "cmp_cmd-level", "settings"], [8, "wlan-x-y"], [2, "shot_cmd", "wlan_list"], [25, "connect_wifi", "wlan_list"], [1, "shot_cmd", "wifi"],
[1, "shot_cmd", "wifi_connection_status"], [1, "cmp_twice", "wifi", 20, "crop_range"], [1, "kill_settings"]
]
},
{
"app_name": "crash_check",
"entry": "",
"compress_file_recv":["cd /data/log/faultlog/temp && tar -cf crash_log.tar cppcrash*"],
"all_actions": [
[1, "remount"], [1, "process_crash_check", "foundation"], [1, "process_crash_check", "render_service"], [1, "process_crash_check", "appspawn"], [1, "compress_file_recv"],
[1, "get_file_from_dev", "/data/log/faultlog/temp/crash_log.tar"]
]
},
{
"app_name": "notification_bar",
"entry": "",
"pull_down_cmd":["uinput -T -m 500 0 500 80"],
"swipe_up_cmd":["uinput -T -m 500 500 500 300"],
"all_actions": [
[2, "pull_down_cmd"], [2, "pull_down_cmd"], [1, "shot_cmd", "notification_bar"], [1, "cmp_cmd-level", "notification_bar", 70], [1, "swipe_up_cmd"], [1, "swipe_up_cmd"]
]
},
{
"app_name": "camera",
"entry": "",
"check_result":["cd /data/log/hilog && grep -nr PreviewOutputCallback", "OnFrameStarted"],
"compress_log":["cd /data/log/hilog && tar -cf camera_log.tar *"],
"open_camera_log":["rm -rf /data/log/hilog/* && hilog -b X;hilog -b D -T CAMERA;hilog -r"],
"start_camera":["aa start -a com.ohos.camera.MainAbility -b com.ohos.camera"],
"recover_log":["cd data/log/hilog/;hilog -x > camera_log.txt;hilog -b D"],
"check_photos":["aa dump -a | grep com.ohos.photos.MainAbility", "com.ohos.photos"],
"kill_camera": ["killall com.ohos.camera"],
"kill_photos": ["killall com.ohos.photos"],
"all_actions": [
[1, "open_camera_log"], [5, "start_camera"], [3, "take_photos-x-y"], [2, "convert_to_video-x-y"], [3, "take_photos-x-y"], [2, "stop_video-x-y"], [11, "convert_to_photos-x-y"],
[1, "recover_log"], [1, "check_result"], [1, "shot_cmd", "camera"], [1, "compress_log"], [1, "check_photos"], [1, "get_file_from_dev", "/data/log/hilog/camera_log.tar"],
[1, "kill_camera"], [1, "kill_photos"]
]
},
{
"app_name": "photos",
"entry": "",
"pull_down_cmd":["uinput -T -m 500 0 550 30"],
"start_screenshot": ["aa start -a com.ohos.screenshot.ServiceExtAbility -b com.ohos.screenshot"],
"start_photos": ["aa start -a com.ohos.photos.MainAbility -b com.ohos.photos"],
"process_check": ["ps -elf", "com.ohos.medialibrary.medialibrarydata"],
"kill_photos": ["killall com.ohos.photos"],
"all_actions": [
[2, "pull_down_cmd"], [5, "screenshot-x-y"], [5, "start_photos"], [2, "last_photos-x-y"], [1, "shot_cmd", "photos"], [1, "cmp_cmd-level", "photos", 70],
[1, "process_check"], [2, "sandbox_path_check"], [1, "kill_photos"]
]
},
{
"app_name": "note",
"entry": "aa start -a MainAbility -b com.ohos.note",
"kill_note": ["killall com.ohos.note"],
"all_actions": [
[2, "kill_keyboard-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [5, "note_content-x-y"], [2, "note_content-x-y"], [1, "shot_cmd", "note"], [1, "cmp_cmd-level", "note", 70], [2, "recent-x-y"], [1, "recent_del-x-y"]
]
},
{
"app_name": "contacts",
"entry": "aa start -a com.ohos.contacts.MainAbility -b com.ohos.contacts",
"kill_contacts": ["killall com.ohos.contacts"],
"all_actions": [
[2, "phone-x-y"], [2, "phone-x-y"], [1, "shot_cmd", "contacts"], [1, "cmp_cmd-level", "contacts"], [1, "kill_contacts"]
]
},
{
"app_name": "mms",
"entry": "aa start -a com.ohos.mms.MainAbility -b com.ohos.mms",
"kill_mms": ["killall com.ohos.mms"],
"all_actions": [
[1, "shot_cmd", "mms"], [1, "cmp_cmd-level", "mms"], [1, "kill_mms"]
]
},
{
"app_name": "distributedmusicplayer",
"entry": "aa start -a ohos.samples.distributedmusicplayer.MainAbility -b ohos.samples.distributedmusicplayer",
"kill_distributedmusicplayer": ["killall ohos.samples.distributedmusicplayer"],
"all_actions": [
[2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [1, "shot_cmd", "distributedmusicplayer"], [1, "cmp_cmd-level", "distributedmusicplayer"], [1, "kill_distributedmusicplayer"]
]
}
]

View File

@ -1,657 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import encodings
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
import shutil
import numpy
import cv2
import pytesseract
sys.path.append(os.path.dirname(os.path.realpath(__file__)).replace('resource', 'acls_check'))
sys.path.append(os.path.dirname(os.path.realpath(__file__)).replace('resource', 'APL_compare_03'))
from acl_check import *
from compare import *
from pytesseract import Output
from PIL import Image
def print_to_log(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path,
'test_{}.log'.format(args.device_num)),
mode='a',
encoding='utf-8') as file:
console = sys.stdout
sys.stdout = file
print(str)
sys.stdout = console
file.close()
def enter_cmd(mycmd, waittime=0, printresult=1):
if mycmd == "":
return
global cmd_retry_cnt
cmd_retry_cnt = 1
enter_cmdRetry = 2
while enter_cmdRetry:
enter_cmdRetry -= 1
try:
p = subprocess.Popen(mycmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result, unused_err = p.communicate(timeout=25)
try:
result = result.decode(encoding="utf-8")
except UnicodeDecodeError:
result = result.decode('gbk', errors='ignore')
break
except Exception as e:
result = 'retry failed again'
print_to_log(e)
cmd_retry_cnt += 1
p.kill()
if printresult == 1:
print_to_log(mycmd)
print_to_log(result)
sys.stdout.flush()
if waittime != 0:
time.sleep(waittime)
return result
def enter_shell_cmd(shellcmd, waittime=1, printresult=1):
if shellcmd == "":
return
cmd = "hdc -t {} shell \"{}\"".format(args.device_num, shellcmd)
return enter_cmd(cmd, waittime, printresult)
def sys_exit():
enter_shell_cmd("cd /data/log/faultlog/temp && tar -cf after_test_cppcrash{}.tar cppcrash*".format(args.device_num))
file_from_dev("/data/log/faultlog/temp/after_test_cppcrash{}.tar".format(args.device_num), \
os.path.normpath(args.save_path))
enter_shell_cmd("cd /data/log/faultlog/faultlogger && tar -cf after_test_jscrash{}.tar jscrash*".format(args.device_num))
file_from_dev("/data/log/faultlog/faultlogger/after_test_jscrash{}.tar".format(args.device_num), \
os.path.normpath(args.save_path))
print_to_log("SmokeTest: SmokeTest find some key problems!")
print_to_log("SmokeTest: End of check, test failed!")
sys.exit(98)
def file_to_dev(src, dst):
cmd = "hdc -t {} file send \"{}\" \"{}\"".format(args.device_num, src, dst)
return enter_cmd(cmd, 1, 1)
def file_from_dev(src, dst):
cmd = "hdc -t {} file recv \"{}\" \"{}\"".format(args.device_num, src, dst)
return enter_cmd(cmd, 1, 1)
def connect_check():
connection_status = enter_cmd("hdc list targets", 2)
connection_cnt = 0
while args.device_num not in connection_status and connection_cnt < 15:
connection_status = enter_cmd("hdc list targets", 2)
connection_cnt += 1
if connection_cnt == 15:
print_to_log("SmokeTest: Device disconnection!!")
print_to_log("SmokeTest: End of check, test failed!")
sys.exit(101)
def sandbox_check(process):
print_to_log("SmokeTest: start to check sandbox path")
medialibrarydata_pidnum = enter_shell_cmd("pgrep -f {}".format(process), 1)
medialibrarydata_pidnum = medialibrarydata_pidnum.strip()
sandboxf = enter_shell_cmd("echo \"ls /storage/media/local/\"|nsenter -t {} -m sh".format(medialibrarydata_pidnum), 1)
if "files" not in sandboxf:
print_to_log("SmokeTest: error: can not find sandbox path : /storage/media/local/files")
return -1
else:
print_to_log("SmokeTest: success: find sandbox path : /storage/media/local/files")
return 1
def get_coordinate(path, target):
wifi_numbers = 8
height = 97
wifi_range = [236, 286, 45, 300]
coordinate = []
img = cv2.imread(path)
tessdata_dir_config = '--tessdata-dir "C:\\Program Files (x86)\\Tesseract-OCR\\tessdata"'
while wifi_numbers:
wifi_range[0] += height
wifi_range[1] += height
print_to_log(wifi_range)
data_img = img[wifi_range[0]:wifi_range[1], wifi_range[2]:wifi_range[3]]
data = pytesseract.image_to_data(data_img,
output_type=Output.DICT,
config=tessdata_dir_config,
lang='eng')
for i in range(len(data['text'])):
if data['text'][i] == target:
dx = int((wifi_range[2] + wifi_range[3]) / 2)
dy = int((wifi_range[0] + wifi_range[1]) / 2)
coordinate.append(dx)
coordinate.append(dy)
wifi_numbers -= 1
if coordinate:
break
return coordinate
def connect_wifi(prefix, pic):
try:
data = get_coordinate("{}\\{}_{}".format(args.save_path, prefix, pic),
"testapold")
enter_shell_cmd("uinput -M -m {} {} -c 0".format(data[0], data[1]),
WAIT_TIME_TWO)
enter_shell_cmd("uinput -M -m 360 200 -c 0")
enter_shell_cmd("uinput -K -d 2032 -u 2032 -d 2017 -u 2017 -d 2035"
" -u 2035 -d 2035 -u 2035 -d 2039 -u 2039 -d 2000"
" -u 2000 -d 2034 -u 2034 -d 2020 -u 2020 -d 2001 -u 2001")
enter_shell_cmd("uinput -M -m 360 200 -c 0")
enter_shell_cmd("uinput -M -m 50 1140 -c 0")
enter_shell_cmd("uinput -M -m 500 1020 -c 0")
enter_shell_cmd("uinput -M -m 50 1140 -c 0")
enter_shell_cmd("uinput -K -d 2054 -u 2054")
enter_shell_cmd("snapshot_display -f /data/local/tmp/screen_test/{}".format("testapold.jpeg"))
file_from_dev("/data/local/tmp/screen_test/{}".format("testapold.jpeg"), args.save_path)
enter_shell_cmd("uinput -M -m 550 680 -c 0", single_action[0])
except Exception as e:
print(e)
print_to_log("SmokeTest: wifi list loading errror!")
def calculate(image1, image2):
image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR)
hist1 = cv2.calcHist([image1], [0], None, [256], [0.0, 255.0])
hist2 = cv2.calcHist([image2], [0], None, [256], [0.0, 255.0])
degree = 0
for i in range(len(hist1)):
if hist1[i] != hist2[i]:
degree = degree + (1 - abs(hist1[i] - hist2[i]) / max(hist1[i], hist2[i]))
else:
degree = degree + 1
degree = degree / len(hist1)
return degree
def classify_hist_with_split(image1, image2, size=(256, 256)):
image1 = Image.open(image1)
image2 = Image.open(image2)
image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR)
image1 = cv2.resize(image1, size)
image2 = cv2.resize(image2, size)
sub_image1 = cv2.split(image1)
sub_image2 = cv2.split(image2)
sub_data = 0
for im1, im2 in zip(sub_image1, sub_image2):
sub_data += calculate(im1, im2)
sub_data = sub_data / 3
return sub_data
def crop_picture(prefix, pic, crop_range):
try:
pic_path = "{}\\{}_{}".format(args.save_path, prefix, pic)
save_path = "{}\\{}_{}".format(args.save_path, prefix, pic)
im = cv2.imread(pic_path)
im = im[crop_range[0]:crop_range[1], crop_range[2]:crop_range[3]]
cv2.imwrite(save_path, im)
except Exception as e:
pass
def cmp_picture(prefix, pic, num=1):
if num == 1:
img1_path = "{}\\{}".format(args.anwser_path, pic)
else:
img1_path = "{}\\2_{}".format(args.anwser_path, pic)
img2_path = "{}\\{}_{}".format(args.save_path, prefix, pic)
cmp_init = 0
try:
cmp_result = classify_hist_with_split(img1_path, img2_path)
print("compare result:" + "%.6f%%" % (cmp_result * 100))
return cmp_result * 100
except Exception as reason:
print("no such file: {}_{}".format(prefix, pic))
return cmp_init
def shot_and_cmp(image):
prefix = args.device_num
enter_shell_cmd(
"snapshot_display -f /data/local/tmp/screen_test/{}_{}".format(prefix, image))
file_from_dev("/data/local/tmp/screen_test/{}_{}".format(prefix, image),
args.save_path)
similarity = cmp_picture(prefix, image)
print_to_log("SmokeTest: launcher similarity is {}%".format(similarity))
return similarity
def distributed_test():
if "1/2" in args.test_num or "2/2" in args.test_num:
report_path = os.path.normpath(
os.path.join(args.save_path, "distributed_report.txt"))
if args.test_num == "2/2":
enter_shell_cmd("ifconfig eth0 192.168.0.1")
ping_result = enter_shell_cmd("ping 192.168.0.2 -i 1 -c 2", 3)
file_is_exist = enter_shell_cmd(
"cd /data; find . -name distributed_report.txt")
ping_cnt = 0
wait_cnt = 0
while "2 packets transmitted, 2 received" not in ping_result and ping_cnt < 20:
ping_result = enter_shell_cmd("ping 192.168.0.2 -i 1 -c 2",
WAIT_TIME_FOUR)
ping_cnt += 1
if ping_cnt == 30:
print_to_log("SmokeTest: Ping failed, timeout of 80s")
sys_exit()
while "distributed_report.txt" not in file_is_exist and wait_cnt < 30:
print_to_log("SmokeTest: waiting for the distributed test to end ")
file_is_exist = enter_shell_cmd(
"cd /data; find . -name distributed_report.txt", WAIT_TIME_FOUR)
wait_cnt += 1
elif args.test_num == "1/2":
enter_shell_cmd("ifconfig eth0 192.168.0.2")
ping_result = enter_shell_cmd(
"ping 192.168.0.1 -i 1 -c 2", WAIT_TIME_FOUR)
ping_cnt = 0
while "2 packets transmitted, 2 received" not in ping_result and ping_cnt < 20:
ping_result = enter_shell_cmd("ping 192.168.0.1 -i 1 -c 2",
WAIT_TIME_FOUR)
ping_cnt += 1
if ping_cnt == 30:
print_to_log("SmokeTest: Ping failed, timeout of 80s")
print_to_log("SmokeTest: ##### case 0 : distributed test start #####")
execute_path = os.path.normpath(os.path.join(args.tools_path, "resource"))
os.system(
"cd {} && python distributedtest.py --path {}".format(execute_path,
args.save_path))
distributed_result = ""
try:
with open(report_path, mode='r', encoding='utf-8', errors='ignore') as f:
f.seek(0)
distributed_result = f.read()
f.close()
except Exception as reason:
print_to_log("SmokeTest: distributed_report.txt do not exist!")
if "distributedcalc" in distributed_result:
print_to_log("SmokeTest: testcase 0, distributed is ok!")
else:
print_to_log("SmokeTest: error:testcase 0, distributed failed!")
sys_exit()
enter_shell_cmd("ifconfig eth0 down")
def open_wlan():
enter_shell_cmd(
"aa start -a com.ohos.settings.MainAbility -b com.ohos.settings",
WAIT_TIME_FOUR)
enter_shell_cmd("uinput -M -m 300 300 -c 0", WAIT_TIME_TWO)
enter_shell_cmd("uinput -M -m 640 200 -c 0", WAIT_TIME_FOUR)
time.sleep(WAIT_TIME_FOUR)
enter_shell_cmd("killall com.ohos.settings", WAIT_TIME_TWO)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--config', type=str,
default='.\\app_capture_screen_test_config.json')
parser.add_argument('--test_num', type=str, default='1/1')
parser.add_argument('--tools_path', type=str,
default='D:\\DeviceTestTools\\screenshot\\')
parser.add_argument('--anwser_path', type=str,
default='D:\\DeviceTestTools\\screenshot\\resource')
parser.add_argument('--save_path', type=str,
default='D:\\DeviceTestTools\\screenshot')
parser.add_argument('--device_num', type=str, default='null')
parser.add_argument('--pr_url', type=str,
default='developtools_integration_verification')
args = parser.parse_args()
if args.device_num == 'null':
result = enter_cmd("hdc list targets", 1, 0)
print(result)
args.device_num = result.split()[0]
with open(args.config) as f:
all_app = json.load(f)
cmp_status = 0
global_pos = all_app[0]
WAIT_TIME_TWO = 2
WAIT_TIME_FOUR = 4
reboot_cnt = 2
while reboot_cnt:
reboot_cnt -= 1
enter_shell_cmd("mkdir -p /data/local/tmp/screen_test/train_set")
enter_shell_cmd("power-shell wakeup;power-shell setmode 602")
rmlock_cnt = 3
while rmlock_cnt:
enter_shell_cmd("uinput -T -m 425 400 425 1000;uinput -T -m 425 1000 425 400")
rmlock_cnt -= 1
enter_shell_cmd("hilog -w stop")
enter_shell_cmd(
"cd /data/log/hilog && tar -cf system_start_log_{}.tar *".format(args.device_num))
file_from_dev("/data/log/hilog/system_start_log_{}.tar".format(args.device_num),
args.save_path)
connect_check()
launcher_similarity = shot_and_cmp("launcher.jpeg")
power_state = enter_shell_cmd("hidumper -s 3308")
if "State=2" not in power_state:
print_to_log("SmokeTest: ERROR, DISPLAY POWER MANAGER DUMP State ≠ 2")
if launcher_similarity >= 80:
print_to_log("SmokeTest: launcher screenshot comparison is ok!")
break
elif reboot_cnt >= 1:
print_to_log("SmokeTest: launcher screenshot comparison failed, reboot and try!!!")
enter_shell_cmd("rm -rf /data/*;reboot")
for i in range(5):
enter_cmd("hdc list targets", 10)
else:
print_to_log("SmokeTest: launcher screenshot comparison failed")
sys_exit()
enter_shell_cmd("cat /proc/`pidof foundation`/smaps_rollup")
print_to_log("\nSmokeTest: ########## First check key processes start ##############")
lose_process = []
process_pid = {}
with open(os.path.normpath(os.path.join(args.tools_path, "resource/process.txt")), "r+") as f:
text = f.read()
two_check_process_list = text.split('#####')[1].split()[0:-1]
other_process_list = text.split('#####')[2].split()
for pname in two_check_process_list:
pids = enter_cmd("hdc -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
int(pidlist[0])
for pid in pidlist:
int(pid)
process_pid[pname] = pidlist
except:
lose_process.append(pname)
all_p = enter_shell_cmd("ps -elf")
for pname in other_process_list:
findp = all_p.find(pname, 0, len(all_p))
if findp == -1:
lose_process.append(pname)
if lose_process:
print_to_log("SmokeTest: error: %s, These processes do not exist!!!" % lose_process)
sys_exit()
else:
print_to_log("SmokeTest: first processes check is ok")
apl_check_main(args.device_num)
apl_compare = os.path.normpath(os.path.join(args.tools_path, "APL_compare_03", "apl_compare.log"))
try:
with open(apl_compare, mode='r', encoding='utf-8', errors='ignore') as compare_file:
compare_file.seek(0)
apl_result = compare_file.read()
compare_file.close()
except Exception as reason:
print_to_log("SmokeTest: error: apl_compare.log do not exist!")
if "APL Check failed" in apl_result:
print_to_log("SmokeTest: error: apl check failed")
sys_exit()
main(args.device_num)
native_sa = os.path.normpath(os.path.join(args.tools_path, "acls_check", "native_sa.log"))
try:
with open(native_sa, mode='r', encoding='utf-8', errors='ignore') as native_file:
native_file.seek(0)
acl_result = native_file.read()
native_file.close()
except Exception as reason:
print_to_log("SmokeTest: error: native_sa.log do not exist!")
if "ACL check failed" in acl_result:
print_to_log("SmokeTest: error: acl check failed")
sys_exit()
try:
args.test_num.index('/')
idx_total = args.test_num.split('/')
if len(idx_total) != 2:
print_to_log("SmokeTest: test_num is invaild !!!")
sys_exit()
elif idx_total[1] == '1':
idx_list = global_pos['DEVICE_2']+global_pos['DEVICE_1']
else:
idx_list = global_pos['DEVICE_{}'.format(idx_total[0])]
except ValueError as e:
print_to_log(e)
idx_list = list(map(eval, args.test_num.split()))
print_to_log("SmokeTest: start to carry out the following testcases: ")
print_to_log("SmokeTest: testcase number: {} ".format(idx_list))
open_wlan()
fail_idx_list = []
fail_name_list = []
smoke_first_failed = ''
for idx in idx_list:
single_app = all_app[idx]
sys.stdout.flush()
call_app_cmd = single_app['entry']
capture_screen_cmd = "snapshot_display -f /data/local/tmp/screen_test/{}_{}"
print_to_log("\nSmokeTest: ##### case {} : {} test start #####".format(idx, single_app['app_name']))
testcnt = 3
while testcnt:
testok = 0
if testcnt != 3:
print_to_log("SmokeTest: this testcase try again >>>>>>:\n")
if single_app['entry'] != "":
enter_shell_cmd(call_app_cmd, WAIT_TIME_FOUR)
print_to_log("SmokeTest: execute command {}".format(single_app['all_actions']))
prefix = args.device_num
raw_pic_name = ''
pic_name = ''
for single_action in single_app['all_actions']:
if type(single_action[1]) == str and single_action[1] == 'shot_cmd':
if len(single_action) == 3:
pic_name = "{}{}".format(single_action[2], ".jpeg")
else:
pic_name = "{}{}".format(single_app['app_name'], ".jpeg")
enter_shell_cmd("rm /data/local/tmp/screen_test/*{}".format(pic_name))
enter_shell_cmd(capture_screen_cmd.format(prefix, pic_name))
file_from_dev("/data/local/tmp/screen_test/{}_{}".format(prefix, pic_name), args.save_path)
next_cmd = ""
elif type(single_action[1]) == str and single_action[1] == 'cmp_twice':
next_cmd = ""
sys.stdout.flush()
pic = "{}{}".format(single_action[2], ".jpeg")
similarity = single_action[3]
crop_range = single_app[single_action[4]]
crop_picture(prefix, pic, crop_range)
first_similarity = cmp_picture(prefix, pic)
second_similarity = cmp_picture(prefix, pic, WAIT_TIME_TWO)
print_to_log("SmokeTest: first picture similarity is {}%".format(first_similarity))
print_to_log("SmokeTest: second picture similarity is {}%".format(second_similarity))
if first_similarity >= similarity or second_similarity >= similarity:
if testok != -1:
testok = 1
print_to_log("SmokeTest: {} screenshot check is ok".format(pic))
else:
testok = -1
print_to_log("SmokeTest: {} screenshot check is abnarmal".format(pic))
elif type(single_action[1]) == str and single_action[1] == 'cmp_cmd-level':
next_cmd = ""
sys.stdout.flush()
if len(single_action) == 4:
similarity = single_action[3]
else:
similarity = global_pos['cmp_cmd-level'][1]
similarity = int(similarity)
print_to_log("SmokeTest: start to contrast screenshot")
pic = "{}{}".format(single_action[2], ".jpeg")
crop_range = [80, 1200, 0, 720]
crop_picture(prefix, pic, crop_range)
pic_similarity = cmp_picture(prefix, pic)
print_to_log("SmokeTest: picture similarity is {}%".format(pic_similarity))
if len(single_action) >= 3:
if pic_similarity >= similarity:
if testok != -1:
testok = 1
print_to_log("SmokeTest: {} screenshot check is ok".format(pic))
else:
testok = -1
print_to_log("SmokeTest: {} screenshot check is abnarmal".format(pic))
elif type(single_action[1]) == str and single_action[1] == 'install_hap':
next_cmd = ""
if len(single_action) == 3:
enter_cmd("hdc -t {} install \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2]))))
elif type(single_action[1]) == str and single_action[1] == 'get_file_from_dev':
next_cmd = ""
if len(single_action) == 3:
enter_cmd("hdc -t {} file recv \"{}\" \"{}\"".format(args.device_num,\
single_action[2], os.path.normpath(args.save_path)))
elif type(single_action[1]) == str and single_action[1] == 'send_file_to_dev':
next_cmd = ""
if len(single_action) == 4:
enter_cmd("hdc -t {} file send \"{}\" \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2])), single_action[3]))
elif type(single_action[1]) == str and single_action[1] == 'connect_wifi':
next_cmd = ""
pic = "{}{}".format(single_action[2], ".jpeg")
connect_wifi(prefix, pic)
elif type(single_action[1]) == str and single_action[1] == 'sandbox_path_check':
next_cmd = ""
if sandbox_check("com.ohos.medialibrary.medialibrarydata") == 1 and testok == 1:
testok = 1
else:
testok = -1
elif type(single_action[1]) == str and single_action[1] == 'process_crash_check':
next_cmd = ""
if len(single_action) == 3:
p = enter_shell_cmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./",\
single_action[0])
result = "".join(p)
findsome = result.find(single_action[2], 0, len(result))
if findsome != -1:
testok = -1
print_to_log("SmokeTest: \"{}\" error:find fatal crash \"{}\"!".format(single_action[1],\
single_action[2]))
sys_exit()
else:
testok = 1
print_to_log("SmokeTest: \"{}\" result is ok, not find fatal\
crash \"{}\"!".format(single_action[1], single_action[2]))
sys.stdout.flush()
elif type(single_action[1]) == str:
if single_action[1] not in single_app.keys():
target_ = global_pos[single_action[1]]
else:
target_ = single_app[single_action[1]]
if type(target_[0]) == str:
next_cmd = ""
p = enter_shell_cmd(target_[0], single_action[0])
result = "".join(p)
if len(target_) > 1:
findsome = result.find(target_[1], 0, len(result))
if findsome != -1:
testok = 1
print_to_log("SmokeTest: \"{}\" check ok, find \"{}\"!".format(target_[0], target_[1]))
else:
testok = -1
print_to_log("SmokeTest: \"{}\" check failed, no \"{}\"!".format(target_[0],target_[1]))
sys.stdout.flush()
else:
next_cmd = "uinput -M -m {} {} -c 0".format(target_[0], target_[1])
else:
next_cmd = "uinput -M -m {} {} -c 0".format(single_action[1], single_action[2])
enter_shell_cmd(next_cmd, single_action[0])
if testok == 1:
print_to_log("SmokeTest: testcase {}, {} is ok!".format(idx, single_app['app_name']))
testcnt = 0
elif testok == -1 and smoke_first_failed == '':
if testcnt == 1:
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
smoke_first_failed = single_app['app_name']
print_to_log("SmokeTest: error:testcase {}, {} is failed!".format(idx, single_app['app_name']))
testcnt -= 1
elif testok == -1 and smoke_first_failed != '':
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
print_to_log("SmokeTest: error:testcase {}, {} is failed!".format(idx, single_app['app_name']))
testcnt = 0
else:
testcnt = 0
connect_check()
enter_shell_cmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./", 1)
enter_shell_cmd("cd /data/log/faultlog/faultlogger && grep \"Process name\" -rnw ./", 1)
fail_str_list = [str(x) for x in fail_idx_list]
reboot_test_num = " ".join(fail_str_list)
if len(fail_idx_list) != 0:
print_to_log("SmokeTest: failed testcase number: {} ".format(fail_str_list))
print_to_log("SmokeTest: check \"reboot\" in reboot.txt".format(args.save_path))
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='a+') as f:
f.seek(0)
reboot_result = f.read()
f.close()
if len(reboot_result) < 1 and reboot_cnt >= 1:
print_to_log("SmokeTest: no \"reboot\" found in the reboot.txt")
print_to_log("SmokeTest: the device will reboot and try the failed testcase")
print_to_log("SmokeTest: mkdir {}\\reboot".format(args.save_path))
os.system("mkdir {}\\reboot".format(args.save_path))
print_to_log("SmokeTest: write \"reboot\" into reboot.txt".format(args.save_path))
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='w') as f:
f.write("reboot")
f.close()
print_to_log("SmokeTest: error: name {}, index {}, failed, reboot".format(fail_name_list,fail_idx_list))
enter_shell_cmd("rm -rf /data/* && reboot")
reboot_result_list = enter_cmd("hdc list targets", 2)
number = 0
while args.device_num not in reboot_result_list and number < 15:
reboot_result_list = enter_cmd("hdc list targets", 2)
number += 1
enter_shell_cmd("rm /data/log/hilog/*;hilog -r;hilog -w start -l 400000000 -m none", 1)
py_cmd = os.system("python {}\\resource\\capturescreentest.py --config \
{}\\resource\\app_capture_screen_test_config.json --anwser_path {} \
--save_path {}\\reboot --tools_path {} --device_num {} --test_num \"{}\"".format(args.tools_path, \
args.tools_path, args.anwser_path, args.save_path, args.tools_path, args.device_num, reboot_test_num))
if py_cmd == 0:
sys.exit(0)
elif py_cmd == 98:
sys.exit(98)
else:
sys.exit(101)
else:
print_to_log("SmokeTest: error: name {}, index {}, failed".format(fail_name_list, fail_idx_list))
sys_exit()
else:
print_to_log("SmokeTest: all testcase is ok")
print_to_log("SmokeTest: End of check, test succeeded!")
sys.exit(0)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

View File

@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import re
import encodings
import subprocess
import argparse
import os
import cv2
import pytesseract
from ast import parse
from PIL import Image
def enter_cmd(mycmd, waittime):
if mycmd == "":
return
global cmd_retry_cnt
cmd_retry_cnt = 1
enter_cmd_retry = 2
while enter_cmd_retry:
enter_cmd_retry -= 1
try:
p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, unused_err = p.communicate(timeout=25)
try:
result=result.decode(encoding="utf-8")
except UnicodeDecodeError:
result=result.decode('gbk', errors='ignore')
break
except Exception as e:
result = 'retry failed again'
print(e)
cmd_retry_cnt += 1
p.kill()
if waittime != 0:
time.sleep(waittime)
print(result)
return result
def enter_shell_cmd(shellcmd, waittime, sn):
global sn1
global sn2
if shellcmd == "":
return
cmd = "hdc_std -t {} shell \"{}\"".format(sn, shellcmd)
return enter_cmd(cmd, waittime)
def file_from_dev(src, dst, sn):
cmd = "hdc_std -t {} file recv \"{}\" \"{}\"".format(sn, src, dst)
return enter_cmd(cmd, 2)
def get_devices_sn():
global sn1
global sn2
cmd_sn = os.popen("hdc_std list targets").read()
device_sn = re.findall('[\w+]{32}', cmd_sn)
sn1 = device_sn[0]
sn2 = device_sn[1]
print(sn1)
print(sn2)
def orc(path):
pytesseract.pytesseract.tesseract_cmd = 'C:\\Program Files (x86)\\Tesseract-OCR\\tesseract.exe'
tessdata_dir_config = '--tessdata-dir "C:\\Program Files (x86)\\Tesseract-OCR\\tessdata"'
image = Image.open(path)
code = pytesseract.image_to_string(image, config=tessdata_dir_config)
return code
def crop_picture(pic, target, crop_range):
pic_path = "{}\\{}".format(args.path, pic)
save_path = "{}\\{}".format(args.path, target)
im = cv2.imread(pic_path)
im = im[crop_range[0]:crop_range[1], crop_range[2]:crop_range[3]]
cv2.imwrite(save_path, im)
def distributed_calc():
time_one = 1
time_two = 2
time_four = 4
crop_range = [520, 585, 250, 460]
enter_shell_cmd("aa start -a ohos.samples.distributedcalc.MainAbility -b ohos.samples.distributedcalc",\
time_four, sn1)
enter_shell_cmd("uinput -M -m 500 1130 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 500 1130 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 610 110 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 380 1150 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 610 110 -c 0", time_two, sn1)
enter_shell_cmd("snapshot_display -f /data/distributedcalc.jpeg", time_two, sn1)
file_from_dev("/data/distributedcalc.jpeg", "{}\\distributedcalc.jpeg".format(args.path), sn1)
enter_shell_cmd("uinput -M -m 580 1090 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 520 520 -c 0", time_two, sn2)
enter_shell_cmd("uinput -M -m 520 520 -c 0", time_two, sn2)
enter_shell_cmd("snapshot_display -f /data/pin.jpeg", time_two, sn2)
file_from_dev("/data/pin.jpeg", "{}\\pin.jpeg".format(args.path), sn2)
crop_picture("pin.jpeg", "pin_code.jpeg", crop_range)
enter_shell_cmd("uinput -M -m 340 530 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 340 530 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 60 1145 -c 0", time_two, sn1)
code = orc(f"{args.path}\\pin_code.jpeg")
pin_code = re.findall("[0-9]{6}", code)[0]
print(pin_code)
for i in pin_code:
if i == "0":
enter_shell_cmd("uinput -M -m 672 800 -c 0", time_one, sn1)
else:
j = int(i) - 1
dx = 42 + j * 70
enter_shell_cmd(f"uinput -M -m {dx} 800 -c 0", time_one, sn1)
time.sleep(1)
enter_shell_cmd("uinput -M -m 60 1145 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 500 600 -c 0", time_two, sn1)
enter_shell_cmd("killall ohos.samples.distributedcalc", time_two, sn1)
enter_shell_cmd("aa start -a ohos.samples.distributedcalc.MainAbility -b ohos.samples.distributedcalc",\
time_four, sn1)
enter_shell_cmd("uinput -M -m 610 110 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 580 1090 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 580 1090 -c 0", time_two, sn1)
enter_shell_cmd("uinput -M -m 500 1130 -c 0", time_two, sn2)
enter_shell_cmd("aa dump -a | grep distributedcalc > /data/distributed_report.txt", time_two, sn2)
file_from_dev("/data/distributed_report.txt", "{}\\distributed_report.txt".format(args.path), sn2)
enter_shell_cmd("killall ohos.samples.distributedcalc", time_two, sn2)
enter_shell_cmd("killall ohos.samples.distributedcalc", time_two, sn1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--path', type=str, default = '.')
args = parser.parse_args()
sn1 = ""
sn2 = ""
get_devices_sn()
distributed_calc()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 148 KiB

View File

@ -1,53 +0,0 @@
####First_check_at_begin__Second_check_at_end__Two_results_must_be_same#####
com.ohos.launcher
render_service
####only_check_these_processes_are_exitst#####
hdf_devmgr
param_watcher
storage_manager
appspawn
hilogd
samgr
storage_daemon
uinput_inject
multimodalinput
huks_service
memmgrservice
bluetooth_servi
resource_schedu
bgtaskmgr_servi
audio_server
deviceauth_service
softbus_server
wifi_hal_service
faultloggerd
accountmgr
time_service
distributeddata
useriam
inputmethod_ser
ui_service
netmanager
sensors
media_service
wifi_manager_se
installs
hiview
telephony
camera_service
foundation
hdcd
light_host
vibrator_host
sensor_host
input_user_host
camera_host
audio_host
wifi_host
usb_host
blue_host
wifi_hal_service
com.ohos.systemui
device_usage_st
power_host

Binary file not shown.

Before

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1,13 @@
Launcher
ProcessCheck
APLCheck
ACLCheck
SettingsWifi
CrashCheck
Photos
Contacts
Mms
DistributedMusicPlayer
Camera
NotificationBar
Note