add readme and init cases

Signed-off-by: taiyi.pei <peitaiyi@huawei.com>
This commit is contained in:
taiyi.pei 2022-12-12 20:49:08 +08:00
parent 97f14762b3
commit a0011dd0a1
73 changed files with 13674 additions and 28 deletions

View File

@ -1,39 +1,46 @@
# developtools_integration_verification # 集成验证部件
#### 介 ##
{**以下是 Gitee 平台说明,您可以替换此简介** 集成验证包含门禁冒烟和每日构建两个功能:
Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN。专为开发者提供稳定、高效、安全的云端软件开发协作平台 - 门禁冒烟对开发者的提交进行基本功能、部件功能和按场景的端到端测试;
无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 [https://gitee.com/enterprises](https://gitee.com/enterprises)} - 每日构建定时取master和release分支代码进行版本归档、全量TDD测试、最小系统测试和部件化的架构看护检查。
#### 软件架构 **图1** 集成验证架构图
软件架构说明
![架构图](figures/arch.png)
#### 安装教程 ## 目录
1. xxxx ```undefined
2. xxxx /developtools/integration_verification
3. xxxx ├── cases # 用例
│   ├── daily # 每日构建
│   │   └── mini_system # 最小系统测试
│   └── smoke # 门禁冒烟
│   ├── audio # 音频用例
│   ├── basic # 基础功能用例
│   │   ├── screenshot32
│   │   └── screenshot64
│   ├── distributed # 分布式场景端到端用例
│   └── video # 视频用例
├── test # 自测试用例
└── tools # 公共工具集
└── rom_ram_analyzer # ROM/RAM分析工具
```
#### 使用说明 ## 说明
1. xxxx ### 使用说明
2. xxxx #### 关联仓和冒烟用例
3. xxxx 当新建仓或仓对应系统功能发生变化时通常需要在门禁中添加和修改冒烟用例的关联。关联关系通过cases/smoke/repo_cases_matrix.csv文件配置文件第一列为仓名从第二列为部件名称第三列开始为不同形态的开发板包括虚拟开发板仓与开发板的交集中填写测试用例名称。示例如下
#### 参与贡献 | repoistory | bundle | board1 | board2 | ... |
| ---------- | ---------- | --------------------- | --------------------- | ---- |
| repo1 | component1 | test_case1 | test_case1;test_case2 | ... |
| repo2 | component2 | test_case1;test_case2 | test_case1 | ... |
1. Fork 本仓库 一般一个测试用例对应一个可执行文件,在门禁构建时产生和烧录设备。
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
## 相关仓
#### 特技 [**developtools\_integration\_verification**](https://gitee.com/openharmony/developtools_integration_verification/blob/master/README_zh.md)
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

View File

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
import serial
import threading
def GetDirSize(dir_path):
if not os.path.exists(dir_path):
PrintToLog("\n\nERROR: %s, dir are not exist!!!\n" % dir_path)
PrintToLog("End of check, test failed!")
sys.exit(99)
size = 0
for root, dirs, files in os.walk(dir_path):
for name in files:
sz = os.path.getsize(os.path.join(root, name))
print('{} : {}byte'.format(os.path.join(root, name), sz))
size += sz
PrintToLog('total size: {:.2f}M'.format(size/1024/1024))
return size
def PrintToLog(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path, 'L0_mini_test.log'), mode='a', encoding='utf-8') as log_file:
console = sys.stdout
sys.stdout = log_file
print(str)
sys.stdout = console
log_file.close()
def WriteToComPort(com_port, cmd):
len = com_port.write(cmd.encode('utf-8'))
print('{}'.format(len))
return
def ReadFromComPort(com_port, timeout):
time_start = datetime.datetime.now()
time_end = time_start
#print((time_end - time_start).seconds)
global com_output
com_output = ''
while (time_end - time_start).seconds < timeout:
com_output_once = ''
while com_port.inWaiting() > 0:
com_output_once += com_port.read(com_port.inWaiting()).decode()
if com_output_once != '':
com_output += com_output_once
print('{}'.format(com_output_once), end='')
time_end = datetime.datetime.now()
return com_output
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--com_port', type=str, default = 'COM5')
parser.add_argument('--com_baudrate', type=int, default = 115200)
parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
parser.add_argument('--archive_path', type=str, default = 'Z:\workspace\ohos_L2\ohos\out\hispark_pegasus\hispark_pegasus_mini_system')
args = parser.parse_args()
com_port = serial.Serial(args.com_port, args.com_baudrate)
if com_port.isOpen():
PrintToLog("{} is open successed".format(com_port))
else:
PrintToLog("{} is open failed".format(com_port))
PrintToLog("End of check, test failed!")
sys.exit(99)
read_com_thread = threading.Thread(target=ReadFromComPort, args=(com_port, 10))
read_com_thread.setDaemon(True)
print('read wait:')
read_com_thread.start()
time.sleep(1)
WriteToComPort(com_port, '\r\n\r\n')
WriteToComPort(com_port, 'AT+SYSINFO\r\n')
print('enter AT+SYSINFO')
time.sleep(3)
hivew_proc_find = re.findall('hiview,id=\d{1,3},status=\d{1,10},pri=\d{1,3},size=', com_output)
#Bootstrap_proc_find = re.findall('Bootstrap,id=\d{1,3},status=\d{1,10},pri=\d{1,3},size=', com_output)
print(hivew_proc_find)
if type(hivew_proc_find) == list and len(hivew_proc_find) > 0:
PrintToLog('hivew_proc found')
else:
PrintToLog('hivew_proc not found')
PrintToLog("End of check, test failed!")
sys.exit(99)
target_file = os.path.normpath(os.path.join(args.archive_path, "OHOS_image.bin"))
ret_size = os.path.getsize(target_file)/1024/1024
PrintToLog('Size of OHOS_image.bin : {:.2f}M'.format(ret_size))
if ret_size > 1:
PrintToLog('ERROR: Size of OHOS_image.bin ({:.2f}M) is over the upper limit(1M)'.format(ret_size))
target_dir = os.path.normpath(os.path.join(args.archive_path, "libs"))
GetDirSize(target_dir)
PrintToLog("End of check, test failed!")
sys.exit(99)
PrintToLog("End of check, test succeeded!")
sys.exit(0)

View File

@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
import serial
import threading
def GetDirSize(dir_path):
if not os.path.exists(dir_path):
PrintToLog("\n\nERROR: %s, dir are not exist!!!\n" % dir_path)
PrintToLog("End of check, test failed!")
sys.exit(99)
size = 0
for root, dirs, files in os.walk(dir_path):
for name in files:
sz = os.path.getsize(os.path.join(root, name))
print('{} : {}byte'.format(os.path.join(root, name), sz))
size += sz
PrintToLog('total size: {:.2f}M'.format(size/1024/1024))
return size
def PrintToLog(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path, 'L1_mini_test.log'), mode='a', encoding='utf-8') as log_file:
console = sys.stdout
sys.stdout = log_file
print(str)
sys.stdout = console
log_file.close()
def WriteToComPort(com_port, cmd):
len = com_port.write(cmd.encode('utf-8'))
#print('{}'.format(len))
return
def ReadFromComPort(com_port, timeout):
time_start = datetime.datetime.now()
time_end = time_start
#print((time_end - time_start).seconds)
global com_output
com_output = ''
while (time_end - time_start).seconds < timeout:
com_output_once = ''
while com_port.inWaiting() > 0:
com_output_once += com_port.read(com_port.inWaiting()).decode()
if com_output_once != '':
com_output += com_output_once
print('{}'.format(com_output_once), end='')
time_end = datetime.datetime.now()
return com_output
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--com_port', type=str, default = 'COM8')
parser.add_argument('--com_baudrate', type=int, default = 115200)
parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
parser.add_argument('--archive_path', type=str, default = 'D:\DeviceTestTools')
args = parser.parse_args()
com_port = serial.Serial(args.com_port, args.com_baudrate)
if com_port.isOpen():
PrintToLog("{} is open successed".format(com_port))
else:
PrintToLog("{} is open failed".format(com_port))
PrintToLog("End of check, test failed!")
sys.exit(99)
res = com_port.write("free".encode('utf-8'));
read_com_thread = threading.Thread(target=ReadFromComPort, args=(com_port, 10))
read_com_thread.setDaemon(True)
print('read wait:')
read_com_thread.start()
time.sleep(1)
WriteToComPort(com_port, '\r\n\r\n\r\n')
WriteToComPort(com_port, 'task\r\n')
print('enter task')
time.sleep(3)
print(com_output)
foundation_proc_find = re.findall('\d{1,4}\s*\d{1,4}\s*\d{1,4}\s*\d{1,4}.\d{1,4}\s*foundation', com_output)
print(foundation_proc_find)
if type(foundation_proc_find) == list and len(foundation_proc_find) > 0:
PrintToLog('found foundation process')
else:
PrintToLog('not found foundation process')
PrintToLog("End of check, test failed!")
sys.exit(99)
shell_proc_find = re.findall('\d{1,4}\s*\d{1,4}\s*\d{1,4}\s*\d{1,4}.\d{1,4}\s*shell', com_output)
print(shell_proc_find)
if type(shell_proc_find) == list and len(shell_proc_find) > 0:
PrintToLog('found shell process')
else:
PrintToLog('not found shell process')
PrintToLog("End of check, test failed!")
sys.exit(99)
apphilogcat_proc_find = re.findall('\d{1,4}\s*\d{1,4}\s*\d{1,4}\s*\d{1,4}.\d{1,4}\s*apphilogcat', com_output)
print(apphilogcat_proc_find)
if type(apphilogcat_proc_find) == list and len(apphilogcat_proc_find) > 0:
PrintToLog('found apphilogcat process')
else:
PrintToLog('not found apphilogcat process')
PrintToLog("End of check, test failed!")
sys.exit(99)
deviceauth_service_proc_find = re.findall('\d{1,4}\s*\d{1,4}\s*\d{1,4}\s*\d{1,4}.\d{1,4}\s*deviceauth_service', com_output)
print(deviceauth_service_proc_find)
if type(deviceauth_service_proc_find) == list and len(deviceauth_service_proc_find) > 0:
PrintToLog('found deviceauth_service process')
else:
PrintToLog('not found deviceauth_service process')
PrintToLog("End of check, test failed!")
sys.exit(99)
softbus_server_proc_find = re.findall('\d{1,4}\s*\d{1,4}\s*\d{1,4}\s*\d{1,4}.\d{1,4}\s*softbus_server', com_output)
print(softbus_server_proc_find)
if type(softbus_server_proc_find) == list and len(softbus_server_proc_find) > 0:
PrintToLog('found softbus_server process')
else:
PrintToLog('not found softbus_server process')
PrintToLog("End of check, test failed!")
sys.exit(99)
mem_find = re.findall('Mem:\s*\d*\s*\d*\s*\d*\s*\d*', com_output)
print(mem_find)
if len(mem_find) > 0:
mem_size = int(mem_find[0].split()[2]) / 1024 / 1024
else:
PrintToLog('Error:can find memory usage info!')
PrintToLog("End of check, test failed!")
sys.exit(99)
if mem_size > 25:
PrintToLog(
'Error:memory usage is over the upper limit(25M),now is {:.2f}'.format(mem_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
target_dir = os.path.normpath(os.path.join(args.archive_path, "rootfs"))
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of rootfs is ({:.2f}M)'.format(ret_size))
if ret_size > 15:
PrintToLog('ERROR: Size of rootfs({:.2f}M) is over the upper limit(15M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
target_dir = os.path.normpath(os.path.join(args.archive_path, "userfs"))
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of userfs is ({:.2f}M)'.format(ret_size))
if ret_size > 15:
PrintToLog('ERROR: Size of userfs({:.2f}M) is over the upper limit(15M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
PrintToLog("End of check, test succeeded!")
sys.exit(0)

View File

@ -0,0 +1,217 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
def GetDirSize(dir_path):
if not os.path.exists(dir_path):
PrintToLog("\n\nERROR: %s, dir are not exist!!!\n" % dir_path)
PrintToLog("End of check, test failed!")
sys.exit(99)
size = 0
for root, dirs, files in os.walk(dir_path):
for name in files:
if not os.path.islink(os.path.join(root, name)):
sz = os.path.getsize(os.path.join(root, name))
#print('{} : {}byte'.format(os.path.join(root, name), sz))
size += sz
PrintToLog('total size: {:.2f}M'.format(size/1024/1024))
return size
def PrintToLog(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path, 'L2_mini_test_{}.log'.format(args.device_num)), mode='a', encoding='utf-8') as log_file:
console = sys.stdout
sys.stdout = log_file
print(str)
sys.stdout = console
log_file.close()
def EnterCmd(mycmd, waittime = 0, printresult = 1):
if mycmd == "":
return
global CmdRetryCnt
CmdRetryCnt = 1
EnterCmdRetry = 2
while EnterCmdRetry:
EnterCmdRetry -= 1
try:
p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, unused_err = p.communicate(timeout=25)
try:
result=result.decode(encoding="utf-8")
except UnicodeDecodeError:
result=result.decode('gbk', errors='ignore')
break
except Exception as e:
result = 'retry failed again'
PrintToLog(e)
CmdRetryCnt += 1
p.kill()
if printresult == 1:
with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file:
cmd_file.write(mycmd + '\n')
cmd_file.close()
PrintToLog(mycmd)
PrintToLog(result)
sys.stdout.flush()
if waittime != 0:
time.sleep(waittime)
if printresult == 1:
with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file:
cmd_file.write("ping -n {} 127.0.0.1>null\n".format(waittime))
cmd_file.close()
return result
def EnterShellCmd(shellcmd, waittime = 0, printresult = 1):
if shellcmd == "":
return
cmd = "hdc_std -t {} shell \"{}\"".format(args.device_num, shellcmd)
return EnterCmd(cmd, waittime, printresult)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
parser.add_argument('--device_num', type=str, default = 'null')
parser.add_argument('--archive_path', type=str, default = 'Z:\workspace\ohos_L2\ohos\out\\rk3568\packages\phone')
args = parser.parse_args()
if args.device_num == 'null':
result = EnterCmd("hdc_std list targets", 1, 0)
print(result)
args.device_num = result.split()[0]
PrintToLog("\n\n########## First check key processes start ##############")
lose_process = []
process_pid = {}
two_check_process_list = ['huks_service', 'hilogd', 'hdf_devmgr', 'samgr', 'foundation', 'accesstoken_ser',]
other_process_list = ['softbus_server', 'deviceauth_service']
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
int(pidlist[0])
for pid in pidlist:
int(pid)
process_pid[pname] = pidlist
except:
lose_process.append(pname)
all_p = EnterShellCmd("ps -elf")
for pname in other_process_list:
findp = all_p.find(pname, 0, len(all_p))
if findp == -1:
lose_process.append(pname)
if lose_process:
PrintToLog("\n\nERROR: %s, These processes are not exist!!!\n" % lose_process)
PrintToLog("End of check, test failed!")
sys.exit(99)
else:
PrintToLog("First processes check is ok\n")
# check processes usage
res = EnterCmd("hdc_std -t {} shell hidumper --mem".format(args.device_num), 0, 1)
process_usage = int(res.split(':')[-1].split()[0]) / 1024
if process_usage > 40:
PrintToLog(
"ERROR: Processes usage cannot be greater than 40M, but currently it's actually %.2fM" % process_usage)
PrintToLog("End of check, test failed!")
sys.exit(99)
time.sleep(10)
#key processes second check, and cmp to first check
PrintToLog("\n\n########## Second check key processes start ##############")
second_check_lose_process = []
#for pname in two_check_process_list + other_process_list:
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
if process_pid[pname] != pidlist:
if pname in two_check_process_list:
PrintToLog("ERROR: pid of %s is different the first check" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
sys.exit(99)
else:
PrintToLog("WARNNING: pid of %s is different the first check" % pname)
elif len(pidlist) != 1:
if pname in two_check_process_list:
PrintToLog("ERROR: pid of %s is not only one" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
sys.exit(99)
else:
PrintToLog("WARNNING: pid of %s is not only one" % pname)
except:
second_check_lose_process.append(pname)
if second_check_lose_process:
PrintToLog("ERROR: pid of %s is not exist" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
sys.exit(99)
else:
PrintToLog("Second processes check is ok\n")
target_dir = os.path.normpath(os.path.join(args.archive_path, "system"))
PrintToLog(target_dir)
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of system is :{:.2f}M'.format(ret_size))
if ret_size > 50:
PrintToLog('ERROR: Size of system({:.2f}M) is over the upper limit(50M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
target_dir = os.path.normpath(os.path.join(args.archive_path, "data"))
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of data is :{:.2f}M'.format(ret_size))
if ret_size > 50:
PrintToLog('ERROR: Size of data({:.2f}M) is over the upper limit(50M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
target_dir = os.path.normpath(os.path.join(args.archive_path, "updater"))
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of updater is :{:.2f}M'.format(ret_size))
if ret_size > 50:
PrintToLog('ERROR: Size of updater({:.2f}M) is over the upper limit(50M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
target_dir = os.path.normpath(os.path.join(args.archive_path, "vendor"))
ret_size = GetDirSize(target_dir)/1024/1024
PrintToLog('Size of vendor is :{:.2f}M'.format(ret_size))
if ret_size > 50:
PrintToLog('ERROR: Size of vendor({:.2f}M) is over the upper limit(50M)'.format(ret_size))
PrintToLog("End of check, test failed!")
sys.exit(99)
PrintToLog("All testcase is ok")
PrintToLog("End of check, test succeeded!")
sys.exit(0)

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="utf-8"?><!--
Copyright (C) 2021 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--><user_config>
<environment>
<device type="usb-hdc">
<ip/>
<port/>
<sn>7001005458323933328a017ce1b13800;7001005458323933328a258f3d043900</sn>
</device>
<device label="wifiiot" type="com">
<serial>
<com/>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com/>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device label="ipcamera" type="com">
<serial>
<com/>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device label="ipcamera" type="com">
<ip/>
<port/>
</device>
</environment>
<testcases>
<dir/>
<server label="NfsServer">
<ip/>
<port/>
<dir/>
<username/>
<password/>
<remote/>
</server>
</testcases>
<resource>
<dir/>
</resource>
<loglevel>DEBUG</loglevel>
<devicelog>ON</devicelog>
</user_config>

View File

@ -0,0 +1,9 @@
import sys
from xdevice.__main__ import main_process
from testcases.set_sn import get_devices_sn
if __name__ == '__main__':
get_devices_sn()
argv = "{} {} {}".format(sys.argv[1], sys.argv[2], sys.argv[3])
print(">>>>>>>:{}".format(argv))
main_process(argv)

View File

@ -0,0 +1,17 @@
{
"description": "Config for OpenHarmony devicetest test cases",
"environment": [
{
"type": "device",
"label": "phone"
},
{
"type": "device",
"label": "phone"
}
],
"driver": {
"type": "DeviceTest",
"py_file": ["DistributedTest.py"]
}
}

View File

@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
import time
import threading
import re
from devicetest.core.test_case import TestCase
from devicetest.aw.OpenHarmony import CommonOH
from testcases.orc import Orc
class DistributedTest(TestCase):
def __init__(self, controllers):
self.TAG = self.__class__.__name__
self.tests = [
# 设备组网
"sub_distributed_smoke_testcase_0100",
# pin码连接
"sub_distributed_smoke_testcase_0200",
# 结果校验
"sub_distributed_smoke_testcase_0300"
]
TestCase.__init__(self, self.TAG, controllers)
def setup(self):
print("预置工作:初始化设备开始...........................")
print(self.devices[0].device_id)
print(self.devices[1].device_id)
def sub_distributed_smoke_testcase_0100(self):
t1 = threading.Thread(target=self.net_connect1)
t2 = threading.Thread(target=self.net_connect2)
t1.start()
t2.start()
t1.join()
t2.join()
def sub_distributed_smoke_testcase_0200(self):
CommonOH.startAbility(self.Phone1, "ohos.samples.distributedcalc.MainAbility", "ohos.samples.distributedcalc")
time.sleep(2)
# 授权
CommonOH.click(self.Phone1, 500, 1130)
CommonOH.click(self.Phone1, 500, 1130)
CommonOH.hdc_std(self.Phone1, "shell snapshot_display -f /data/distributedcalc_step1.jpeg")
CommonOH.hdc_std(self.Phone1, "file recv /data/distributedcalc_step1.jpeg testcases\\distributedcalc_step1.jpeg")
CommonOH.click(self.Phone1, 610, 110)
time.sleep(2)
CommonOH.click(self.Phone1, 380, 1150)
CommonOH.click(self.Phone1, 610, 110)
time.sleep(2)
CommonOH.hdc_std(self.Phone1, "shell snapshot_display -f /data/distributedcalc_step2.jpeg")
CommonOH.hdc_std(self.Phone1, "file recv /data/distributedcalc_step2.jpeg testcases\\distributedcalc_step2.jpeg")
CommonOH.click(self.Phone1, 580, 1090)
time.sleep(2)
CommonOH.hdc_std(self.Phone2, "shell snapshot_display -f /data/distributedcalc_step3.jpeg")
CommonOH.hdc_std(self.Phone2, "file recv /data/distributedcalc_step3.jpeg testcases\\distributedcalc_step3.jpeg")
#确定
CommonOH.click(self.Phone2, 520, 520)
CommonOH.click(self.Phone2, 520, 520)
CommonOH.hdc_std(self.Phone2, "shell snapshot_display -f /data/distributedcalc_step4.jpeg")
CommonOH.hdc_std(self.Phone2, "file recv /data/distributedcalc_step4.jpeg testcases\\distributedcalc_step4.jpeg")
time.sleep(2)
code = Orc("testcases\\distributedcalc_step4.jpeg")
self.code = re.findall("[0-9]{6}", code)[0]
#输pin码
CommonOH.click(self.Phone1, 340, 530)
CommonOH.click(self.Phone1, 340, 530)
time.sleep(1)
#切换至数字输入
CommonOH.click(self.Phone1, 60, 1145)
time.sleep(1)
for i in self.code:
if i == "0":
CommonOH.click(self.Phone1, 676, 778)
else:
j = int(i) - 1
CommonOH.click(self.Phone1, 46 + j * 70, 778)
time.sleep(1)
CommonOH.click(self.Phone1, 60, 1145)
CommonOH.hdc_std(self.Phone1, "shell snapshot_display -f /data/distributedcalc_step5.jpeg")
# 确定
CommonOH.click(self.Phone1, 500, 600)
CommonOH.hdc_std(self.Phone1, "file recv /data/distributedcalc_step5.jpeg testcases\\distributedcalc_step5.jpeg")
def sub_distributed_smoke_testcase_0300(self):
# 切入后台,结束进程
CommonOH.click(self.Phone1, 512, 1246)
CommonOH.click(self.Phone1, 360, 1168)
# 重启计算器应用
CommonOH.startAbility(self.Phone1, "ohos.samples.distributedcalc.MainAbility", "ohos.samples.distributedcalc")
time.sleep(2)
# 拉起远端设备
CommonOH.click(self.Phone1, 610, 110)
time.sleep(3)
CommonOH.hdc_std(self.Phone1, "shell snapshot_display -f /data/distributedcalc_step6.jpeg")
CommonOH.hdc_std(self.Phone1, "file recv /data/distributedcalc_step6.jpeg testcases\\distributedcalc_step6.jpeg")
CommonOH.click(self.Phone1, 580, 1090)
CommonOH.click(self.Phone1, 580, 1090)
# 设备二授权
time.sleep(2)
CommonOH.click(self.Phone2, 500, 1130)
CommonOH.click(self.Phone2, 500, 1130)
# 校验远端计算器是否被拉起
CommonOH.hdc_std(self.Phone2, "shell snapshot_display -f /data/distributedcalc_step7.jpeg")
CommonOH.hdc_std(self.Phone2, "file recv /data/distributedcalc_step7.jpeg testcases\\distributedcalc_step7.jpeg")
CommonOH.hdc_std(self.Phone2, 'shell "aa dump -a | grep distributedcalc > /data/report.txt"')
CommonOH.hdc_std(self.Phone2, "file recv /data/report.txt testcases\\report.txt")
time.sleep(1)
CommonOH.hdc_std(self.Phone1, "file send testcases\\report.txt /data/report.txt")
def net_connect1(self):
# 点亮屏幕
CommonOH.wake(self.Phone1)
# 设置不息屏
CommonOH.hdc_std(self.Phone1, 'shell "power-shell setmode 602"')
def net_connect2(self):
# 点亮屏幕
CommonOH.wake(self.Phone2)
# 设置不息屏
CommonOH.hdc_std(self.Phone2, 'shell "power-shell setmode 602"')
def teardown(self):
# 切入后台,结束进程
CommonOH.hdc_std(self.Phone1, "shell killall ohos.samples.distributedcalc")
CommonOH.hdc_std(self.Phone2, "shell killall ohos.samples.distributedcalc")

View File

@ -0,0 +1,10 @@
import pytesseract
from PIL import Image
def Orc(path):
pytesseract.pytesseract.tesseract_cmd = 'C:\\Program Files (x86)\\Tesseract-OCR\\tesseract.exe'
tessdata_dir_config = '--tessdata-dir "C:\\Program Files (x86)\\Tesseract-OCR\\tessdata"'
image = Image.open(path)
code = pytesseract.image_to_string(image, config=tessdata_dir_config)
return code

View File

@ -0,0 +1,21 @@
import encodings
import os
import re
from xml.dom.minidom import parse
def get_devices_sn():
cmd_sn = os.popen("hdc_std list targets").read()
device_sn = re.findall('[\w+]{32}', cmd_sn) + re.findall('[\w+]{16}', cmd_sn)
dom_tree = parse('config\\user_config.xml')
collection = dom_tree.documentElement
sn1 = collection.getElementsByTagName('sn')[0]
if len(device_sn[0]) == len(device_sn[1]):
sn1.childNodes[0].data = "{};{}".format(device_sn[0], device_sn[1])
else:
sn1.childNodes[0].data = device_sn[0]
with open('config\\user_config.xml', 'w', encoding='utf-8') as f:
dom_tree.writexml(f, encoding='utf-8')
f.close()
if __name__ == '__main__':
get_devices_sn()

Binary file not shown.

View File

@ -0,0 +1,39 @@
#!/system/bin/sh
# udhcpc script edited by Tim Riker <Tim@Rikers.org>
[ -z "$1" ] && echo "Error: should be called from udhcpc" && exit 1
RESOLV_CONF="/etc/resolv.conf"
[ -n "$broadcast" ] && BROADCAST="broadcast $broadcast"
[ -n "$subnet" ] && NETMASK="netmask $subnet"
case "$1" in
deconfig)
/system/bin/ifconfig $interface 0.0.0.0
;;
renew|bound)
/system/bin/ifconfig $interface $ip $BROADCAST $NETMASK
if [ -n "$router" ] ; then
echo "deleting routers"
while ./busybox route del default gw 0.0.0.0 dev $interface ; do
:
done
for i in $router ; do
./busybox route add default gw $i dev $interface
done
fi
echo -n > $RESOLV_CONF
[ -n "$domain" ] && echo search $domain >> $RESOLV_CONF
for i in $dns ; do
echo adding dns $i
echo nameserver $i >> $RESOLV_CONF
done
;;
esac
exit 0

View File

@ -0,0 +1,8 @@
country=GB
#update_config=1
#ap_scan=1
network={
ssid="testapold"
psk="passw0rd1!"
}

View File

@ -0,0 +1,145 @@
[
{
"DEVICE_1":[1, 2, 3, 8, 10, 12],
"DEVICE_2":[4, 5, 6, 7, 9, 11],
"recent-x-y":[515, 1240],
"recent_del-x-y":[360, 1170],
"permisson_ok-x-y":[500, 1130],
"note_content-x-y":[500, 310],
"take_photos-x-y":[360, 1095],
"convert_to_video-x-y":[430, 980],
"convert_to_photos-x-y":[200, 1095],
"last_photos-x-y":[100, 220],
"stop_video-x-y":[320, 1095],
"phone-x-y":[645, 1060],
"screenshot-x-y":[115,480],
"shot_cmd":[""],
"remount":["mount -o rw,remount"],
"stop_hilog":["hilog -w stop"],
"cmp_cmd-level":["", 443200],
"get_file_from_dev":[""],
"send_file_to_dev":["", ""]
},
{
"app_name": "crash_check",
"entry": "",
"compress_file_recv":["cd /data/log/faultlog/temp && tar -cf crash_log.tar cppcrash*"],
"all_actions": [
[1, "remount"], [1, "process_crash_check", "foundation"], [1, "process_crash_check", "render_service"], [1, "process_crash_check", "appspawn"], [1, "compress_file_recv"],
[1, "get_file_from_dev", "/data/log/faultlog/temp/crash_log.tar"]
]
},
{
"app_name": "notification_bar",
"entry": "",
"pull_down_cmd":["uinput -T -m 500 0 550 30"],
"swipe_up_cmd":["uinput -T -m 500 500 550 300"],
"all_actions": [
[2, "pull_down_cmd"], [2, "pull_down_cmd"], [2, "shot_cmd"], [2, "cmp_cmd-level"], [1, "swipe_up_cmd"], [1, "swipe_up_cmd"]
]
},
{
"app_name": "wifi_connect",
"entry": "",
"check_ping_baidu":["ping www.baidu.com", "64 bytes from"],
"all_actions": [
[2, "connect_wifi"], [1, "check_ping_baidu"]
]
},
{
"app_name": "video_test",
"entry": "",
"mk_test_dir":["mkdir -p /data/app/el2/100/base/ohos.acts.multimedia.video.videoplayer/haps/entry/files"],
"start_video_log":["rm /data/log/hilog/* && hilog -r && hilog -Q pidoff;hilog -G 512M;hilog -w start -l 400000000 -m none"],
"start_test":["aa test -p ohos.acts.multimedia.video.videoplayer -b ohos.acts.multimedia.video.videoplayer -s unittest OpenHarmonyTestRunner -w 2000000 -s timeout 60000", "Failure: 0, Error: 0, Pass: 1"],
"compress_log":["cd /data/log/hilog && tar -cf video_log.tar *"],
"kill_video": ["killall ohos.acts.multimedia.video.videoplayer"],
"all_actions": [
[1,"start_video_log"], [2, "install_hap", "vediotest/ActsVideoPlayerJsTest.hap"], [1, "mk_test_dir"], [1, "remount"],
[1, "send_file_to_dev", "vediotest/H264_AAC.mp4", "/data/app/el2/100/base/ohos.acts.multimedia.video.videoplayer/haps/entry/files/"],
[5, "start_test"], [1, "stop_hilog"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/video_log.tar"], [1, "kill_video"]
]
},
{
"app_name": "camera",
"entry": "",
"check_result":["cd /data/log/hilog && grep -nr PreviewOutputCallback", "OnFrameStarted"],
"compress_log":["cd /data/log/hilog && tar -cf camera_log.tar *"],
"open_camera_log":["rm /data/log/hilog/* && hilog -b X;hilog -b D -T CAMERA;hilog -r"],
"start_camera":["aa start -a com.ohos.camera.MainAbility -b com.ohos.camera"],
"recover_log":["cd data/log/hilog/;hilog -x > camera_log.txt;hilog -b D"],
"kill_camera": ["killall com.ohos.camera"],
"kill_photos": ["killall com.ohos.photos"],
"all_actions": [
[1, "open_camera_log"], [5, "start_camera"], [3, "take_photos-x-y"], [2, "convert_to_video-x-y"], [3, "take_photos-x-y"], [2, "stop_video-x-y"], [6, "convert_to_photos-x-y"],
[2, 670, 40], [1, "recover_log"], [1, "check_result"], [1, "shot_cmd", "camera_to_photos"], [1, "cmp_cmd-level", 1900000], [1, "compress_log"],
[1, "get_file_from_dev", "/data/log/hilog/camera_log.tar"], [1, "kill_camera"], [1, "kill_photos"]
]
},
{
"app_name": "audio_render",
"entry": "",
"remount":["mount -o rw,remount /"],
"chmodfile1":["chmod 777 /data/audio_renderer_unit_test"],
"audio_render_test":["cd /data && ./audio_renderer_unit_test --gtest_filter=AudioRendererUnitTest.Audio_Renderer_Playback_001 && rm *.xml", "[ PASSED ] 1 test"],
"all_actions": [
[1, "remount"], [1, "send_file_to_dev", "audiotest/audio_renderer_unit_test", "/data/"], [1, "send_file_to_dev", "audiotest/test_44100_2.wav", "/data/"], [2, "chmodfile1"],
[5, "audio_render_test"]
]
},
{
"app_name": "photos",
"entry": "",
"pull_down_cmd":["uinput -T -m 500 0 550 30"],
"start_screenshot": ["aa start -a com.ohos.screenshot.ServiceExtAbility -b com.ohos.screenshot"],
"start_photos": ["aa start -a com.ohos.photos.MainAbility -b com.ohos.photos"],
"kill_photos": ["killall com.ohos.photos"],
"all_actions": [
[2, "pull_down_cmd"], [5, "screenshot-x-y"], [5, "start_photos"], [2, "last_photos-x-y"], [2, "shot_cmd"], [1, "cmp_cmd-level"],
[1, "get_file_from_dev", "/data/app/el2/100/database/com.ohos.medialibrary.medialibrarydata/rdb/media_library.db"],
[1, "get_file_from_dev", "/data/app/el2/100/database/com.ohos.medialibrary.medialibrarydata/rdb/media_library.db-wal"],
[2, "photo_check"], [1, "process_check", "com.ohos.medialibrary.medialibrarydata"], [2, "sandbox_path_check"], [1, "kill_photos"]
]
},
{
"app_name": "settings",
"entry": "aa start -a com.ohos.settings.MainAbility -b com.ohos.settings",
"kill_settings": ["killall com.ohos.settings"],
"all_actions": [
[2, "shot_cmd", "settings"], [1, "cmp_cmd-level"], [1, "kill_settings"]
]
},
{
"app_name": "note",
"entry": "aa start -a MainAbility -b com.ohos.note",
"kill_note": ["killall com.ohos.note"],
"all_actions": [
[2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [5, "note_content-x-y"], [2, "note_content-x-y"], [2, "shot_cmd"], [1, "cmp_cmd-level"], [2, "recent-x-y"], [1, "recent_del-x-y"]
]
},
{
"app_name": "contacts",
"entry": "aa start -a com.ohos.contacts.MainAbility -b com.ohos.contacts",
"kill_contacts": ["killall com.ohos.contacts"],
"all_actions": [
[2, "phone-x-y"], [2, "phone-x-y"], [2, "shot_cmd"], [1, "cmp_cmd-level"], [1, "kill_contacts"]
]
},
{
"app_name": "mms",
"entry": "aa start -a com.ohos.mms.MainAbility -b com.ohos.mms",
"kill_mms": ["killall com.ohos.mms"],
"all_actions": [
[2, "shot_cmd"], [1, "cmp_cmd-level"], [1, "kill_mms"]
]
},
{
"app_name": "distributedmusicplayer",
"entry": "aa start -a ohos.samples.distributedmusicplayer.MainAbility -b ohos.samples.distributedmusicplayer",
"kill_distributedmusicplayer": ["killall ohos.samples.distributedmusicplayer"],
"all_actions": [
[2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "shot_cmd"], [1, "cmp_cmd-level", 960000], [1, "kill_distributedmusicplayer"]
]
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 325 KiB

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,637 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import encodings
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
import sqlite3
import shutil
def PrintToLog(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path, 'test_{}.log'.format(args.device_num)),\
mode='a', encoding='utf-8') as log_file:
console = sys.stdout
sys.stdout = log_file
print(str)
sys.stdout = console
log_file.close()
def EnterCmd(mycmd, waittime=0, printresult=1):
if mycmd == "":
return
global CmdRetryCnt
CmdRetryCnt = 1
EnterCmdRetry = 2
while EnterCmdRetry:
EnterCmdRetry -= 1
try:
p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, unused_err = p.communicate(timeout=25)
try:
result=result.decode(encoding="utf-8")
except UnicodeDecodeError:
result=result.decode('gbk', errors='ignore')
break
except Exception as e:
result = 'retry failed again'
PrintToLog(e)
CmdRetryCnt += 1
p.kill()
if printresult == 1:
with open(os.path.join(args.save_path, 'test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_f:
cmd_f.write(mycmd + '\n')
cmd_f.close()
PrintToLog(mycmd)
PrintToLog(result)
sys.stdout.flush()
if waittime != 0:
time.sleep(waittime)
if printresult == 1:
with open(os.path.join(args.save_path, 'test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_f:
cmd_f.write("ping -n {} 127.0.0.1>null\n".format(waittime))
cmd_f.close()
return result
def EnterShellCmd(shellcmd, waittime=0, printresult=1):
if shellcmd == "":
return
cmd = "hdc_std -t {} shell \"{}\"".format(args.device_num, shellcmd)
return EnterCmd(cmd, waittime, printresult)
def SysExit():
EnterShellCmd("cd /data/log/faultlog/temp && tar -cf after_test_cppcrash{}.tar cppcrash*".format(args.device_num))
GetFileFromDev("/data/log/faultlog/temp/after_test_cppcrash{}.tar".format(args.device_num), \
os.path.normpath(args.save_path))
EnterShellCmd("cd /data/log/faultlog/faultlogger && tar -cf after_test_jscrash{}.tar jscrash*".format(args.device_num))
GetFileFromDev("/data/log/faultlog/faultlogger/after_test_jscrash{}.tar".format(args.device_num), \
os.path.normpath(args.save_path))
PrintToLog("SmokeTest:: SmokeTest find some key problems!")
PrintToLog("SmokeTest:: End of check, test failed!")
sys.exit(98)
def SendFileToDev(src, dst):
cmd = "hdc_std -t {} file send \"{}\" \"{}\"".format(args.device_num, src, dst)
return EnterCmd(cmd, 1, 1)
def GetFileFromDev(src, dst):
cmd = "hdc_std -t {} file recv \"{}\" \"{}\"".format(args.device_num, src, dst)
return EnterCmd(cmd, 1, 1)
def ImageCheck(str, testnum=1):
conn = sqlite3.connect(str)
cursor_image = conn.cursor()
cursor_video = conn.cursor()
PrintToLog("SmokeTest:: start to check media library path")
try:
PrintToLog("SmokeTest:: select * from files where mime_type = image/*")
cursor_image.execute("""select * from files where mime_type = "image/*" """)
except:
PrintToLog("SmokeTest:: error: media_library.db cannot be found, please check media library path")
return -1
if testnum == 2:
try:
PrintToLog("SmokeTest:: select * from files where mime_type = video/mp4")
cursor_video.execute("""select * from files where mime_type = "video/mp4" """)
except:
PrintToLog("SmokeTest:: error: media_library.db cannot be found, please check media library path")
return -1
PrintToLog("SmokeTest:: media library is ok")
image_result = cursor_image.fetchone()
video_result = cursor_video.fetchone()
PrintToLog("SmokeTest:: image: {}".format(image_result))
PrintToLog("SmokeTest:: video: {}".format(video_result))
PrintToLog("SmokeTest:: start to check photos and videos in the album")
if image_result is not None and testnum == 1:
PrintToLog("SmokeTest:: success: There are some photos in the album!!")
return 1
elif image_result is not None and video_result is not None and testnum == 2:
PrintToLog("SmokeTest:: success: There are some photos and videos in the album!!")
return 1
else:
PrintToLog("SmokeTest:: error: There are no photos or videos in the album!!")
return -1
def ConnectionJudgment():
connection_status = EnterCmd("hdc_std list targets", 2)
connection_cnt = 0
while args.device_num not in connection_status and connection_cnt < 15:
connection_status = EnterCmd("hdc_std list targets", 2)
connection_cnt += 1
if connection_cnt == 15:
PrintToLog("SmokeTest:: Device disconnection!!")
PrintToLog("SmokeTest:: End of check, test failed!")
sys.exit(101)
def sandbox_check(process):
PrintToLog("SmokeTest:: start to check sandbox path")
medialibrarydata_pidnum = EnterShellCmd("pgrep -f {}".format(process), 1)
medialibrarydata_pidnum = medialibrarydata_pidnum.strip()
sandboxf = EnterShellCmd("echo \"ls /storage/media/local/\"|nsenter -t {} -m sh".format(medialibrarydata_pidnum), 1)
if "files" not in sandboxf:
PrintToLog("SmokeTest:: error: can not find sandbox path : /storage/media/local/files")
return -1
else:
PrintToLog("SmokeTest:: success: find sandbox path : /storage/media/local/files")
return 1
def is_image_file(filename):
IMG_EXTENSIONS = ['.png', '.PNG', 'jpeg']
return any(filename.endswith(extension) for extension in IMG_EXTENSIONS)
def picture_save(pic_path):
for root, dirs, files in os.walk(pic_path):
for file in files:
file_path = os.path.join(root, file)
if is_image_file(file_path):
shutil.copy2(file_path, args.save_path)
PrintToLog("SmokeTest:: send {} to save_path Successfully".format(file_path))
def ConnectToWifi(tools_path):
EnterShellCmd("mkdir /data/l2tool", 1)
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/busybox")), "/data/l2tool/")
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/dhcpc.sh")), "/data/l2tool/")
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/wpa_supplicant.conf")), "/data/l2tool/")
EnterShellCmd("wpa_supplicant -B -d -i wlan0 -c /data/l2tool/wpa_supplicant.conf", 1)
EnterShellCmd("chmod 777 ./data/l2tool/busybox", 1)
cnt = 2
while cnt:
try:
PrintToLog("SmokeTest:: hdc_std shell ./data/l2tool/busybox udhcpc -i wlan0 -s /data/l2tool/dhcpc.sh")
p = subprocess.check_output(shlex.split("hdc_std -t {} shell ./data/l2tool/busybox udhcpc -i wlan0 -s \
/data/l2tool/dhcpc.sh".format(args.device_num)), timeout=8)
PrintToLog(p.decode(encoding="utf-8"))
with open(os.path.join(args.save_path, 'test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_f:
cmd_f.write('hdc_std shell ./data/l2tool/busybox udhcpc -i wlan0 -s /data/l2tool/dhcpc.sh' + '\n')
cmd_f.close()
ret_code = 0
except subprocess.TimeoutExpired as time_e:
PrintToLog(time_e)
ret_code = 1
if ret_code == 0:
ip = re.findall(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", p.decode(encoding="utf-8"))
PrintToLog(ip)
if len(ip) <= 0:
break
if len(re.findall(r'(?<!\d)\d{1,3}\.\d{1,3}\.\d{1,3}(?=\.\d)', ip[0])) <= 0:
break
gate = str(re.findall(r'(?<!\d)\d{1,3}\.\d{1,3}\.\d{1,3}(?=\.\d)', ip[0])[0]) + ".1"
PrintToLog(gate)
ifconfig="ifconfig wlan0 {}".format(ip[0])
EnterShellCmd(ifconfig)
routeconfig="./data/l2tool/busybox route add default gw {} dev wlan0".format(gate)
EnterShellCmd(routeconfig)
break
PrintToLog("try {}".format(cnt))
cnt -= 1
time.sleep(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--config', type=str, default = './app_capture_screen_test_config.json')
parser.add_argument('--test_num', type=str, default = '1/1')
parser.add_argument('--tools_path', type=str, default = 'D:\\DeviceTestTools\\screenshot\\')
parser.add_argument('--anwser_path', type=str, default = 'D:\\DeviceTestTools\\screenshot\\resource')
parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
parser.add_argument('--device_num', type=str, default = 'null')
parser.add_argument('--pr_url', type=str, default = 'https://gitee.com/openharmony/applications_sample_wifi_iot/')
args = parser.parse_args()
if args.device_num == 'null':
result = EnterCmd("hdc_std list targets", 1, 0)
print(result)
args.device_num = result.split()[0]
with open(args.config) as f:
all_app = json.load(f)
cmp_status = 0
global_pos = all_app[0]
rebootcnt = 2
while rebootcnt:
rebootcnt -= 1
EnterCmd("hdc_std list targets", 1)
EnterShellCmd("mkdir -p /data/screen_test/train_set")
SendFileToDev(os.path.normpath(os.path.join(args.tools_path, "resource/printscreen")), "/data/screen_test/")
EnterShellCmd("chmod 777 /data/screen_test/printscreen")
rmlockcnt = 3
while rmlockcnt:
EnterShellCmd("uinput -T -m 425 1000 425 400;power-shell wakeup;uinput -T -m 425 400 425 1000;\
power-shell setmode 602;uinput -T -m 425 1000 425 400;", 1)
rmlockcnt -= 1
EnterShellCmd("hilog -w stop", 1)
EnterShellCmd("cd /data/log/hilog && tar -cf system_start_log_{}.tar *".format(args.device_num), 1)
GetFileFromDev("/data/log/hilog/system_start_log_{}.tar".format(args.device_num), args.save_path)
SendFileToDev(os.path.normpath(os.path.join(args.anwser_path, "launcher.pngraw")),\
"/data/screen_test/train_set")
EnterShellCmd("/data/screen_test/printscreen -f /data/screen_test/launcher_{}.png".format(args.device_num), 1)
GetFileFromDev("/data/screen_test/launcher_{}.pngraw".format(args.device_num), args.save_path)
GetFileFromDev("/data/screen_test/launcher_{}.png".format(args.device_num), args.save_path)
ConnectionJudgment()
cmp_launcher = "cmp -l /data/screen_test/launcher_{}.pngraw \
/data/screen_test/train_set/launcher.pngraw | wc -l".format(args.device_num)
p = EnterShellCmd(cmp_launcher, 1)
num = re.findall(r'[-+]?\d+', p)
PrintToLog(num)
if type(num) == list and len(num) > 0 and int(num[0]) < 147456 and\
p.find('No such file or directory', 0, len(p)) == -1:
PrintToLog("SmokeTest:: launcher screenshot comparison is ok!")
break
elif rebootcnt >= 1:
PrintToLog("SmokeTest:: launcher screenshot comparison failed, reboot and try!!!")
EnterShellCmd("rm -rf /data/*;reboot")
for i in range(5):
EnterCmd("hdc_std list targets", 10)
else:
PrintToLog("SmokeTest:: launcher screenshot comparison failed")
SysExit()
EnterShellCmd("cat /proc/`pidof foundation`/smaps_rollup")
PrintToLog("\nSmokeTest:: ########## First check key processes start ##############")
lose_process = []
process_pid = {}
with open(os.path.normpath(os.path.join(args.tools_path, "resource/process.txt")), "r+") as f:
text = f.read()
two_check_process_list = text.split('#####')[1].split()[0:-1]
other_process_list = text.split('#####')[2].split()
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
int(pidlist[0])
for pid in pidlist:
int(pid)
process_pid[pname] = pidlist
except:
lose_process.append(pname)
all_p = EnterShellCmd("ps -elf")
for pname in other_process_list:
findp = all_p.find(pname, 0, len(all_p))
if findp == -1:
lose_process.append(pname)
if lose_process:
PrintToLog("SmokeTest:: error: %s, These processes are not exist!!!" % lose_process)
SysExit()
else:
PrintToLog("SmokeTest:: first processes check is ok")
power_state = EnterShellCmd("hidumper -s 3308", 1)
if "State=2" not in power_state:
PrintToLog("SmokeTest:: DISPLAY POWER MANAGER DUMP State=0")
SysExit()
if "1/2" in args.test_num or "2/2" in args.test_num:
EnterShellCmd("param set persist.ace.testmode.enabled 1", 1)
EnterShellCmd("rm /data/log/hilog/*;hilog -r;hilog -Q pidoff;hilog -Q domainoff;hilog -G 512M;hilog -b D", 1)
picture_path = os.path.normpath(os.path.join(args.tools_path, "DistributedTest\\testcases"))
report_path = os.path.normpath(os.path.join(args.tools_path, "DistributedTest\\testcases\\report.txt"))
if args.test_num == "2/2":
EnterShellCmd("ifconfig eth0 192.168.0.1", 1)
ping_result = EnterShellCmd("ping 192.168.0.2 -i 1 -c 2", 3)
file_is_exist = EnterShellCmd("cd /data; find . -name report.txt")
ping_cnt = 0
wait_cnt = 0
while "2 packets transmitted, 2 received" not in ping_result and ping_cnt < 60:
ping_result = EnterShellCmd("ping 192.168.0.2 -i 1 -c 2", 3)
ping_cnt += 1
if ping_cnt == 60:
PrintToLog("SmokeTest:: Ping failed, timeout of 3 minutes")
SysExit()
while "report.txt" not in file_is_exist and wait_cnt < 60:
PrintToLog("SmokeTest:: waiting for the distributed test to end ")
file_is_exist = EnterShellCmd("cd /data; find . -name report.txt", 3)
wait_cnt += 1
elif args.test_num == "1/2":
EnterShellCmd("ifconfig eth0 192.168.0.2", 1)
ping_result = EnterShellCmd("ping 192.168.0.1 -i 1 -c 2", 3)
ping_cnt = 0
while "2 packets transmitted, 2 received" not in ping_result and ping_cnt < 60:
ping_result = EnterShellCmd("ping 192.168.0.1 -i 1 -c 2", 3)
ping_cnt += 1
if ping_cnt == 60:
PrintToLog("SmokeTest:: Ping failed, timeout of 3 minutes")
PrintToLog("SmokeTest:: ##### case 0 : distributed test start #####")
execute_path = os.path.normpath(os.path.join(args.tools_path, "DistributedTest"))
PrintToLog("SmokeTest:: execute_path {}".format(execute_path))
os.system("cd {} && python main.py run -l DistributedTest".format(execute_path))
distributed_result = ""
try:
with open(report_path, mode='r', encoding='utf-8', errors='ignore') as f:
f.seek(0)
distributed_result = f.read()
f.close()
except Exception as reason:
PrintToLog("SmokeTest:: report.txt is not exist!")
if "distributedcalc" in distributed_result:
picture_save(picture_path)
PrintToLog("SmokeTest:: testcase 0, distributed is ok!")
else:
picture_save(picture_path)
PrintToLog("SmokeTest:: error:testcase 0, distributed failed!")
EnterShellCmd("ifconfig eth0 down", 1)
try:
args.test_num.index('/')
idx_total = args.test_num.split('/')
if len(idx_total) != 2:
PrintToLog("SmokeTest:: test_num is invaild !!!")
SysExit()
elif idx_total[1] == '1':
idx_list = list(range(1, len(all_app)))
else:
idx_list = global_pos['DEVICE_{}'.format(idx_total[0])]
except ValueError as e:
PrintToLog(e)
idx_list = list(map(eval, args.test_num.split()))
PrintToLog("SmokeTest:: start to carry out the following testcases: ")
PrintToLog("SmokeTest:: testcase number: {} ".format(idx_list))
fail_idx_list = []
fail_name_list = []
smoke_first_failed = ''
for idx in idx_list:
single_app = all_app[idx]
sys.stdout.flush()
call_app_cmd = single_app['entry']
capture_screen_cmd = "/data/screen_test/printscreen -f /data/screen_test/{}_{}"
cmp_cmd = "cmp -l /data/screen_test/{}_{} /data/screen_test/train_set/{} | wc -l"
PrintToLog("\nSmokeTest:: ##### case {} : {} test start #####".format(idx, single_app['app_name']))
with open(os.path.join(args.save_path, 'test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_f:
cmd_f.write("\nSmokeTest::::::case {} --- {} test start \n".format(idx, single_app['app_name']))
cmd_f.close()
testcnt = 3
while testcnt:
testok = 0
if testcnt != 3:
PrintToLog("SmokeTest:: this testcase try again >>>>>>:\n")
with open(os.path.join(args.save_path, 'test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_f:
cmd_f.write("\nSmokeTest::::::Last failed, try again \n")
cmd_f.close()
if idx == 2 or idx == 8:
testcnt = 1
if single_app['entry'] != "":
EnterShellCmd(call_app_cmd, 5)
PrintToLog("SmokeTest:: execute command {}".format(single_app['all_actions']))
raw_pic_name = ''
pic_name = ''
for single_action in single_app['all_actions']:
if type(single_action[1]) == str and single_action[1] == 'shot_cmd':
if len(single_action) == 3:
pic_name = "{}{}".format(single_action[2], ".png")
raw_pic_name = single_action[2] + ".pngraw"
else:
pic_name = "{}{}".format(single_app['app_name'], ".png")
raw_pic_name = single_app['app_name'] + ".pngraw"
EnterShellCmd("rm /data/screen_test/{}_{}*".format(4 - testcnt, pic_name), 1)
EnterShellCmd(capture_screen_cmd.format(4 - testcnt, pic_name), 1)
GetFileFromDev("/data/screen_test/{}_{}".format(4 - testcnt, pic_name), args.save_path)
next_cmd = ""
elif type(single_action[1]) == str and single_action[1] == 'cmp_cmd-level':
next_cmd = ""
sys.stdout.flush()
EnterShellCmd("rm /data/train_set/{}".format(raw_pic_name), 1)
SendFileToDev(os.path.normpath(os.path.join(args.anwser_path, raw_pic_name)),\
"/data/screen_test/train_set")
new_cmp_cmd = cmp_cmd.format(4 - testcnt, raw_pic_name, raw_pic_name)
if len(single_action) == 3:
tolerance = single_action[2]
else:
tolerance = global_pos['cmp_cmd-level'][1]
PrintToLog("SmokeTest:: start to contrast screenshot")
p = EnterShellCmd(new_cmp_cmd, single_action[0])
num = re.findall(r'[-+]?\d+', p)
PrintToLog("SmokeTest:: contrast pixel difference {}".format(num))
if type(num) == list and len(num) > 0 and int(num[0]) < tolerance and\
p.find('No such file or directory', 0, len(p)) == -1:
if testok == 0:
testok = 1
PrintToLog("SmokeTest:: {} screenshot check is ok!".format(raw_pic_name))
else:
testok = -1
PrintToLog("SmokeTest:: {} screenshot check is abnarmal!".format(raw_pic_name))
sys.stdout.flush()
if testok == 1 or testcnt == 1 or smoke_first_failed != '':
old_name = os.path.normpath(os.path.join(args.save_path, "{}_{}".format(4 - testcnt, pic_name)))
GetFileFromDev("/data/screen_test/{}_{}".format(4 - testcnt, raw_pic_name), args.save_path)
os.system("rename {} {}".format(old_name, pic_name))
os.system("rename {}raw {}raw".format(old_name, pic_name))
raw_pic_name = ''
elif type(single_action[1]) == str and single_action[1] == 'install_hap':
next_cmd = ""
if len(single_action) == 3:
EnterCmd("hdc_std -t {} install \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2]))))
elif type(single_action[1]) == str and single_action[1] == 'get_file_from_dev':
next_cmd = ""
if len(single_action) == 3:
EnterCmd("hdc_std -t {} file recv \"{}\" \"{}\"".format(args.device_num,\
single_action[2], os.path.normpath(args.save_path)))
elif type(single_action[1]) == str and single_action[1] == 'send_file_to_dev':
next_cmd = ""
if len(single_action) == 4:
EnterCmd("hdc_std -t {} file send \"{}\" \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2])), single_action[3]))
elif type(single_action[1]) == str and single_action[1] == 'connect_wifi':
next_cmd = ""
ConnectToWifi(args.tools_path)
elif type(single_action[1]) == str and single_action[1] == 'sandbox_path_check':
next_cmd = ""
if sandbox_check("com.ohos.medialibrary.medialibrarydata") == 1 and testok == 1:
testok = 1
else:
testok = -1
elif type(single_action[1]) == str and single_action[1] == 'photo_check':
next_cmd = ""
if ImageCheck("{}\\media_library.db".format(os.path.normpath(args.save_path)), 1) == 1 and testok == 1:
testok = 1
else:
testok = -1
elif type(single_action[1]) == str and single_action[1] == 'process_check':
next_cmd = ""
if len(single_action) == 3:
p = EnterShellCmd("ps -elf", single_action[0])
result = "".join(p)
findsome = result.find(single_action[2], 0, len(result))
if findsome != -1 and testok == 1:
testok = 1
PrintToLog("SmokeTest:: \"{}\" is ok, find process \"{}\"!".format(single_action[1],\
single_action[2]))
else:
testok = -1
PrintToLog("SmokeTest:: \"{}\" failed, not find process \"{}\"!".format(single_action[1],\
single_action[2]))
sys.stdout.flush()
elif type(single_action[1]) == str and single_action[1] == 'process_crash_check':
next_cmd = ""
if len(single_action) == 3:
p = EnterShellCmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./",\
single_action[0])
result = "".join(p)
findsome = result.find(single_action[2], 0, len(result))
if findsome != -1:
testok = -1
PrintToLog("SmokeTest:: \"{}\" error:find fatal crash \"{}\"!".format(single_action[1],\
single_action[2]))
SysExit()
else:
testok = 1
PrintToLog("SmokeTest:: \"{}\" result is ok, not find fatal\
crash \"{}\"!".format(single_action[1], single_action[2]))
sys.stdout.flush()
elif type(single_action[1]) == str:
if single_action[1] not in single_app.keys():
target_ = global_pos[single_action[1]]
else:
target_ = single_app[single_action[1]]
if type(target_[0]) == str:
next_cmd = ""
p = EnterShellCmd(target_[0], single_action[0])
result = "".join(p)
if len(target_) > 1:
findsome = result.find(target_[1], 0, len(result))
if findsome != -1:
testok = 1
PrintToLog("SmokeTest:: \"{}\" check result is ok, find \"{}\"!".format(target_[0],\
target_[1]))
else:
testok = -1
PrintToLog("SmokeTest:: \"{}\" result is not ok, not find \"{}\"!".format(target_[0],\
target_[1]))
sys.stdout.flush()
else:
next_cmd = "uinput -M -m {} {} -c 0".format(target_[0], target_[1])
else:
next_cmd = "uinput -M -m {} {} -c 0".format(single_action[1], single_action[2])
EnterShellCmd(next_cmd, single_action[0])
if testok == 1:
PrintToLog("SmokeTest:: testcase {}, {} is ok!".format(idx, single_app['app_name']))
testcnt = 0
elif testok == -1 and smoke_first_failed == '':
if testcnt == 1:
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
smoke_first_failed = single_app['app_name']
PrintToLog("SmokeTest:: error:testcase {}, {} is failed!".format(idx, single_app['app_name']))
testcnt -= 1
elif testok == -1 and smoke_first_failed != '':
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
PrintToLog("SmokeTest:: error:testcase {}, {} is failed!".format(idx, single_app['app_name']))
testcnt = 0
else:
testcnt = 0
ConnectionJudgment()
PrintToLog("\nSmokeTest:: ########## Second check key processes start ##############")
second_check_lose_process = []
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
if process_pid[pname] != pidlist:
if pname in two_check_process_list:
PrintToLog("SmokeTest:: error: pid of %s is different the first check" % pname)
else:
PrintToLog("SmokeTest:: warnning: pid of %s is different the first check" % pname)
elif len(pidlist) != 1:
if pname in two_check_process_list:
PrintToLog("SmokeTest:: error: pid of %s is not only one" % pname)
else:
PrintToLog("SmokeTest:: warnning: pid of %s is not only one" % pname)
except:
second_check_lose_process.append(pname)
if second_check_lose_process:
PrintToLog("SmokeTest:: error: pid of %s is not exist" % pname)
else:
PrintToLog("SmokeTest:: second processes check is ok")
EnterShellCmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./", 1)
EnterShellCmd("cd /data/log/faultlog/faultlogger && grep \"Process name\" -rnw ./", 1)
fail_str_list = [str(x) for x in fail_idx_list]
reboot_test_num = " ".join(fail_str_list)
if len(fail_idx_list) != 0:
PrintToLog("SmokeTest:: failed testcase number: {} ".format(fail_str_list))
PrintToLog("SmokeTest:: check \"reboot\" in reboot.txt".format(args.save_path))
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='a+') as f:
f.seek(0)
reboot_result = f.read()
f.close()
if len(reboot_result) < 1 and rebootcnt >= 1:
PrintToLog("SmokeTest:: \"reboot\" is not found in the reboot.txt")
PrintToLog("SmokeTest:: the device will reboot and try the failed testcase")
PrintToLog("SmokeTest:: mkdir {}\\reboot".format(args.save_path))
os.system("mkdir {}\\reboot".format(args.save_path))
PrintToLog("SmokeTest:: write \"reboot\" into reboot.txt".format(args.save_path))
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='w') as f:
f.write("reboot")
f.close()
PrintToLog("SmokeTest:: error: name {}, index {}, failed, rm /data/* and reboot".format(fail_name_list,\
fail_idx_list))
EnterShellCmd("rm -rf /data/* && reboot")
reboot_result_list = EnterCmd("hdc_std list targets", 2)
number = 0
while args.device_num not in reboot_result_list and number < 15:
reboot_result_list = EnterCmd("hdc_std list targets", 2)
number += 1
EnterShellCmd("rm /data/log/hilog/*;hilog -r;hilog -w start -l 400000000 -m none", 1)
py_cmd = os.system("python {}\\resource\\capturescreentest.py --config \
{}\\resource\\app_capture_screen_test_config.json --anwser_path {} \
--save_path {}\\reboot --tools_path {} --device_num {} --test_num \"{}\"".format(args.tools_path, \
args.tools_path, args.anwser_path, args.save_path, args.tools_path, args.device_num, reboot_test_num))
if py_cmd == 0:
sys.exit(0)
elif py_cmd == 98:
sys.exit(98)
elif py_cmd == 99:
sys.exit(99)
else:
sys.exit(101)
else:
PrintToLog("SmokeTest:: error: name {}, index {}, these testcase is failed".format(fail_name_list,\
fail_idx_list))
SysExit()
else:
PrintToLog("SmokeTest:: all testcase is ok")
PrintToLog("SmokeTest:: End of check, test succeeded!")
sys.exit(0)

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 352 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 165 KiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 784 KiB

File diff suppressed because one or more lines are too long

Binary file not shown.

View File

@ -0,0 +1,59 @@
####First_check_at_begin__Second_check_at_end__Two_results_must_be_same#####
com.ohos.launcher
render_service
####only_check_these_processes_are_exitst#####
hdf_devmgr
param_watcher
storage_manager
appspawn
hilogd
samgr
storage_daemon
udevd
uinput_inject
multimodalinput
huks_service
memmgrservice
bluetooth_servi
pulseaudio
accessibility
resource_schedu
bgtaskmgr_servi
audio_policy
deviceauth_service
softbus_server
wifi_hal_service
faultloggerd
accountmgr
time_service
distributeddata
useriam
inputmethod_ser
ui_service
distributedfile
netmanager
sensors
media_service
wifi_manager_se
distributedsche
installs
hiview
telephony
camera_service
foundation
hdcd
disp_gralloc_host
light_host
vibrator_host
sensor_host
input_user_host
camera_host
audio_host
wifi_host
usb_host
blue_host
wifi_hal_service
com.ohos.systemui
device_usage_st
power_host

Binary file not shown.

After

Width:  |  Height:  |  Size: 165 KiB

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,39 @@
#!/system/bin/sh
# udhcpc script edited by Tim Riker <Tim@Rikers.org>
[ -z "$1" ] && echo "Error: should be called from udhcpc" && exit 1
RESOLV_CONF="/etc/resolv.conf"
[ -n "$broadcast" ] && BROADCAST="broadcast $broadcast"
[ -n "$subnet" ] && NETMASK="netmask $subnet"
case "$1" in
deconfig)
/system/bin/ifconfig $interface 0.0.0.0
;;
renew|bound)
/system/bin/ifconfig $interface $ip $BROADCAST $NETMASK
if [ -n "$router" ] ; then
echo "deleting routers"
while ./busybox route del default gw 0.0.0.0 dev $interface ; do
:
done
for i in $router ; do
./busybox route add default gw $i dev $interface
done
fi
echo -n > $RESOLV_CONF
[ -n "$domain" ] && echo search $domain >> $RESOLV_CONF
for i in $dns ; do
echo adding dns $i
echo nameserver $i >> $RESOLV_CONF
done
;;
esac
exit 0

View File

@ -0,0 +1,8 @@
country=GB
#update_config=1
#ap_scan=1
network={
ssid="testapold"
psk="passw0rd1!"
}

View File

@ -0,0 +1,130 @@
[
{
"DEVICE_1":[1],
"DEVICE_2":[1],
"return-x-y":[210, 1240],
"recent-x-y":[515, 1240],
"home-x-y":[360, 1240],
"recent_del-x-y":[360, 1170],
"permisson_ok-x-y":[575, 700],
"permisson_no-x-y":[140, 700],
"take_photos-x-y":[360, 1095],
"convert_to_video-x-y":[430, 980],
"convert_to_photos-x-y":[200, 1095],
"photos-x-y":[590, 40],
"stop_video-x-y":[320, 1095],
"shot_cmd":[""],
"remount":["mount -o rw,remount"],
"stop_hilog":["hilog -w stop"],
"cmp_cmd-level":["", 443200],
"get_file_from_dev":[""],
"send_file_to_dev":["", ""]
},
{
"app_name": "crash_check",
"entry": "",
"compress_file_recv":["cd /data/log/faultlog/temp && tar -cf crash_log.tar cppcrash*"],
"crash_check":["ls /data/log/faultlog/temp/ -al | wc -l", ""],
"clear_faultlog":["rm /data/log/faultlog/temp/*"],
"all_actions": [
[1, "remount"], [1, "process_crash_check", "foundation"], [1, "process_crash_check", "render_service"], [1, "process_crash_check", "appspawn"], [1, "compress_file_recv"], [1, "get_file_from_dev", "/data/log/faultlog/temp/crash_log.tar"]
]
},
{
"app_name": "notification_bar",
"entry": "",
"pull_down_cmd":["uinput -T -m 500 0 550 30"],
"swipe_up_cmd":["uinput -T -m 500 500 550 300"],
"compress_log":["cd /data/log/hilog && tar -cf notification_bar.tar *"],
"all_actions": [
[2, "pull_down_cmd"], [2, "pull_down_cmd"], [2, "pull_down_cmd"], [2, "pull_down_cmd"],
[2, "shot_cmd"], [2, "cmp_cmd-level"], [1, "swipe_up_cmd"], [1, "swipe_up_cmd"], [1, "stop_hilog"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/notification_bar.tar"]
]
},
{
"app_name": "wifi_connect",
"entry": "",
"check_ping_baidu":["ping www.baidu.com", "64 bytes from"],
"compress_log":["cd /data/log/hilog && tar -cf wifi_connect_log.tar *"],
"all_actions": [[2, "connect_wifi"], [1, "check_ping_baidu"], [1, "stop_hilog"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/wifi_connect_log.tar"]]
},
{
"app_name": "video_test",
"entry": "",
"mk_test_dir":["mkdir -p /data/app/el1/bundle/public/ohos.acts.multimedia.video.videoplayer/ohos.acts.multimedia.video.videoplayer/assets/entry/resources/rawfile"],
"close_auto_log":["hilog -w stop;rm /data/log/hilog/*"],
"start_video_log":["hilog -r;hilog -Q pidoff;hilog -G 512M;hilog -w start -l 400000000 -m none"],
"start_test":["aa start -a ohos.acts.multimedia.video.videoplayer.MainAbility -b ohos.acts.multimedia.video.videoplayer"],
"check_result":["cd /data/log/hilog && grep -nr 'total cases'", "failure 0,error 0,pass 1"],
"compress_log":["cd /data/log/hilog && tar -cf video_log.tar *"],
"clear_log":["rm /data/log/hilog/*"],
"all_actions": [
[1,"close_auto_log"], [1,"start_video_log"], [2, "install_hap", "vediotest/ActsVideoPlayerJsTest.hap"], [1, "mk_test_dir"], [1, "remount"],
[1, "send_file_to_dev", "vediotest/H264_AAC.mp4", "/data/app/el1/bundle/public/ohos.acts.multimedia.video.videoplayer/ohos.acts.multimedia.video.videoplayer/assets/entry/resources/rawfile/"],
[15, "start_test"], [1, "stop_hilog"], [1, "check_result"], [1, "compress_log"],
[1, "get_file_from_dev", "/data/log/hilog/video_log.tar"], [2, "recent-x-y"], [2, "recent_del-x-y"], [2, "home-x-y"], [2, "recent-x-y"], [2, "recent_del-x-y"]
]
},
{
"app_name": "audio_render",
"entry": "",
"remount":["mount -o rw,remount /"],
"chmodfile1":["chmod 777 /data/audio_renderer_unit_test"],
"audio_render_test":["cd /data && ./audio_renderer_unit_test --gtest_filter=AudioRendererUnitTest.Audio_Renderer_Playback_001 && rm *.xml", "[ PASSED ] 1 test"],
"compress_log":["cd /data/log/hilog && tar -cf audio_render_log.tar *"],
"all_actions": [
[1, "remount"], [1, "send_file_to_dev", "audiotest/audio_renderer_unit_test", "/data/"], [1, "send_file_to_dev", "audiotest/test_44100_2.wav", "/data/"], [2, "chmodfile1"],
[5, "audio_render_test"], [1, "stop_hilog"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/audio_render_log.tar"]
]
},
{
"app_name": "photos",
"entry": "aa start -a com.ohos.photos.MainAbility -b com.ohos.photos",
"compress_log":["cd /data/log/hilog && tar -cf photos_log.tar *"],
"all_actions": [
[2, "shot_cmd"], [2, "recent-x-y"], [2, "recent_del-x-y"], [1, "cmp_cmd-level"], [2, "recent-x-y"], [2, "recent_del-x-y"], [1, "stop_hilog"], [1, "compress_log"],
[1, "get_file_from_dev", "/data/log/hilog/photos_log.tar"], [1, "process_check", "com.ohos.medialibrary.medialibrarydata"]
]
},
{
"app_name": "camera",
"entry": "",
"check_result":["cd /data/log/hilog && grep -nr PreviewOutputCallback", "OnFrameStarted"],
"compress_log":["cd /data/log/hilog && tar -cf camera_log.tar *"],
"close_auto_log":["hilog -w stop;rm /data/log/hilog/*"],
"open_camera_log":["hilog -b X;hilog -b D -T CAMERA;hilog -r"],
"start_camera":["aa start -a com.ohos.camera.MainAbility -b com.ohos.camera"],
"recover_log":["cd data/log/hilog/;hilog -x > camera_log.txt;hilog -b D"],
"all_actions": [
[1, "close_auto_log"], [1, "open_camera_log"], [5, "start_camera"], [2, "take_photos-x-y"], [2, "take_photos-x-y"], [1, "recover_log"], [1, "check_result"], [1, "shot_cmd"],
[2, "recent-x-y"], [2, "recent_del-x-y"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/camera_log.tar"]
]
},
{
"app_name": "jump_to_photos",
"entry": "aa start -a com.ohos.camera.MainAbility -b com.ohos.camera",
"compress_log":["cd /data/log/hilog && tar -cf jump_to_photos_log.tar *"],
"all_actions": [
[3, "convert_to_photos-x-y"], [3, "convert_to_photos-x-y"], [3, "convert_to_photos-x-y"], [2, "photos-x-y"], [2, "photos-x-y"], [2, "photos-x-y"], [2, "shot_cmd"], [2, "recent-x-y"], [2, "recent_del-x-y"],
[1, "stop_hilog"], [1, "compress_log"],[1, "get_file_from_dev", "/data/log/hilog/jump_to_photos_log.tar"]
]
},
{
"app_name": "settings",
"entry": "aa start -a com.ohos.settings.MainAbility -b com.ohos.settings",
"compress_log":["cd /data/log/hilog && tar -cf settings_log.tar *"],
"all_actions": [
[2, "shot_cmd", "settings"], [1, "cmp_cmd-level"], [2, "recent-x-y"], [2, "recent_del-x-y"], [2, "home-x-y"], [2, "recent-x-y"], [2, "recent_del-x-y"], [1, "stop_hilog"],
[1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/settings_log.tar"]
]
},
{
"app_name": "note",
"entry": "aa start -a MainAbility -b com.ohos.note",
"compress_log":["cd /data/log/hilog && tar -cf note_log.tar *"],
"all_actions": [
[2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "permisson_ok-x-y"], [2, "shot_cmd"], [2, "recent-x-y"], [2, "recent_del-x-y"], [2, "home-x-y"],
[2, "recent-x-y"], [2, "recent_del-x-y"], [1, "cmp_cmd-level"], [1, "stop_hilog"], [1, "compress_log"], [1, "get_file_from_dev", "/data/log/hilog/note_log.tar"]
]
}
]

View File

@ -0,0 +1,533 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import parse
import json
import sys
import os
import time
import argparse
import re
import subprocess
import shlex
import datetime
def PrintToLog(str):
time = datetime.datetime.now()
str = "[{}] {}".format(time, str)
print(str)
with open(os.path.join(args.save_path, 'shot_test_{}.log'.format(args.device_num)),\
mode='a', encoding='utf-8') as log_file:
console = sys.stdout
sys.stdout = log_file
print(str)
sys.stdout = console
log_file.close()
def EnterCmd(mycmd, waittime=0, printresult=1):
if mycmd == "":
return
global CmdRetryCnt
CmdRetryCnt = 1
EnterCmdRetry = 2
while EnterCmdRetry:
EnterCmdRetry -= 1
try:
p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, unused_err = p.communicate(timeout=25)
try:
result=result.decode(encoding="utf-8")
except UnicodeDecodeError:
result=result.decode('gbk', errors='ignore')
break
except Exception as e:
result = 'retry failed again'
PrintToLog(e)
CmdRetryCnt += 1
p.kill()
if printresult == 1:
with open(os.path.join(args.save_path, 'shot_test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_file:
cmd_file.write(mycmd + '\n')
cmd_file.close()
PrintToLog(mycmd)
PrintToLog(result)
sys.stdout.flush()
if waittime != 0:
time.sleep(waittime)
if printresult == 1:
with open(os.path.join(args.save_path, 'shot_test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_file:
cmd_file.write("ping -n {} 127.0.0.1>null\n".format(waittime))
cmd_file.close()
return result
def EnterShellCmd(shellcmd, waittime=0, printresult=1):
if shellcmd == "":
return
cmd = "hdc_std -t {} shell \"{}\"".format(args.device_num, shellcmd)
return EnterCmd(cmd, waittime, printresult)
def SysExit():
EnterShellCmd("cd /data/log/faultlog/temp && tar -cf after_test_crash_log_{}.tar cppcrash*".format(args.device_num))
GetFileFromDev("/data/log/faultlog/temp/after_test_crash_log_{}.tar".format(args.device_num),\
os.path.normpath(args.save_path))
sys.exit(99)
def SendFileToDev(src, dst):
cmd = "hdc_std -t {} file send \"{}\" \"{}\"".format(args.device_num, src, dst)
return EnterCmd(cmd, 1, 1)
def GetFileFromDev(src, dst):
cmd = "hdc_std -t {} file recv \"{}\" \"{}\"".format(args.device_num, src, dst)
return EnterCmd(cmd, 1, 1)
def connection_judgment():
connection_status = EnterCmd("hdc_std list targets", 2)
connection_cnt = 0
while "7001005458323933328a" not in connection_status and connection_cnt < 15:
connection_status = EnterCmd("hdc_std list targets", 2)
connection_cnt += 1
if connection_cnt == 15:
PrintToLog("Device disconnection!!")
PrintToLog("End of check, test failed!")
sys.exit(101)
def ConnectToWifi(tools_path):
EnterShellCmd("mkdir /data/l2tool", 1)
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/busybox")), "/data/l2tool/")
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/dhcpc.sh")), "/data/l2tool/")
SendFileToDev(os.path.normpath(os.path.join(tools_path, "l2tool/wpa_supplicant.conf")), "/data/l2tool/")
EnterShellCmd("wpa_supplicant -B -d -i wlan0 -c /data/l2tool/wpa_supplicant.conf", 1)
EnterShellCmd("chmod 777 ./data/l2tool/busybox", 1)
cnt = 2
while cnt:
try:
PrintToLog("hdc_std shell ./data/l2tool/busybox udhcpc -i wlan0 -s /data/l2tool/dhcpc.sh")
p = subprocess.check_output(shlex.split("hdc_std -t {} shell ./data/l2tool/busybox udhcpc -i\
wlan0 -s /data/l2tool/dhcpc.sh".format(args.device_num)), timeout=8)
PrintToLog(p.decode(encoding="utf-8"))
with open(os.path.join(args.save_path, 'shot_test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_file:
cmd_file.write('hdc_std shell ./data/l2tool/busybox udhcpc -i wlan0 -s /data/l2tool/dhcpc.sh' + '\n')
cmd_file.close()
ret_code = 0
except subprocess.TimeoutExpired as time_e:
PrintToLog(time_e)
ret_code = 1
if ret_code == 0:
ip = re.findall(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", p.decode(encoding="utf-8"))
PrintToLog(ip)
if len(ip) <= 0:
break
if len(re.findall(r'(?<!\d)\d{1,3}\.\d{1,3}\.\d{1,3}(?=\.\d)', ip[0])) <= 0:
break
gate = str(re.findall(r'(?<!\d)\d{1,3}\.\d{1,3}\.\d{1,3}(?=\.\d)', ip[0])[0]) + ".1"
PrintToLog(gate)
ifconfig="ifconfig wlan0 {}".format(ip[0])
EnterShellCmd(ifconfig)
routeconfig="./data/l2tool/busybox route add default gw {} dev wlan0".format(gate)
EnterShellCmd(routeconfig)
break
PrintToLog("try {}".format(cnt))
cnt -= 1
time.sleep(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument('--config', type=str, default = './app_capture_screen_test_config.json')
parser.add_argument('--test_num', type=str, default = '1/1')
parser.add_argument('--tools_path', type=str, default = 'D:\\DeviceTestTools\\screenshot\\')
parser.add_argument('--anwser_path', type=str, default = 'D:\\DeviceTestTools\\screenshot\\resource')
parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
parser.add_argument('--device_num', type=str, default = 'null')
parser.add_argument('--pr_url', type=str, default = 'https://gitee.com/openharmony/applications_sample_wifi_iot/')
args = parser.parse_args()
if args.device_num == 'null':
result = EnterCmd("hdc_std list targets", 1, 0)
print(result)
args.device_num = result.split()[0]
with open(args.config) as f:
all_app = json.load(f)
cmp_status = 0
global_pos = all_app[0]
#rmlock
rebootcnt = 2
while rebootcnt:
rebootcnt -= 1
EnterCmd("hdc_std list targets", 1)
EnterShellCmd("mkdir -p /data/screen_test/train_set")
SendFileToDev(os.path.normpath(os.path.join(args.tools_path, "resource/printscreen")), "/data/screen_test/")
EnterShellCmd("chmod 777 /data/screen_test/printscreen")
rmlockcnt = 5
while rmlockcnt:
EnterShellCmd("uinput -T -m 425 1000 425 400;power-shell wakeup;uinput -T -m 425 400 425 1000;\
power-shell setmode 602;uinput -T -m 425 1000 425 400;", 1)
rmlockcnt -= 1
EnterShellCmd("hilog -w stop", 1)
EnterShellCmd("cd /data/log/hilog && tar -cf system_start_log_{}.tar *".format(args.device_num), 1)
GetFileFromDev("/data/log/hilog/system_start_log_{}.tar".format(args.device_num), args.save_path)
#print(os.path.normpath(os.path.join(args.anwser_path, "launcher.pngraw")))
SendFileToDev(os.path.normpath(os.path.join(args.anwser_path, "launcher.pngraw")),\
"/data/screen_test/train_set")
EnterShellCmd("/data/screen_test/printscreen -f /data/screen_test/launcher_{}.png".format(args.device_num), 1)
GetFileFromDev("/data/screen_test/launcher_{}.pngraw".format(args.device_num), args.save_path)
GetFileFromDev("/data/screen_test/launcher_{}.png".format(args.device_num), args.save_path)
connection_judgment()
cmp_launcher = "cmp -l /data/screen_test/launcher_{}.pngraw\
/data/screen_test/train_set/launcher.pngraw | wc -l".format(args.device_num)
p = EnterShellCmd(cmp_launcher, 1)
num = re.findall(r'[-+]?\d+', p)
PrintToLog(num)
if type(num) == list and len(num) > 0 and int(num[0]) < 184320 and\
p.find('No such file or directory', 0, len(p)) == -1:
PrintToLog("remove lock is ok!\n\n")
break
elif rebootcnt >= 1:
PrintToLog("remove lock failed, reboot and try!!!\n\n")
EnterShellCmd("rm -rf /data/*;reboot")
for i in range(5):
EnterCmd("hdc_std list targets", 10)
else:
PrintToLog("ERROR: remove lock failed\n\n")
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
PrintToLog("\n\n########## First check key processes start ##############")
lose_process = []
process_pid = {}
with open(os.path.normpath(os.path.join(args.tools_path, "resource/process.txt")), "r+") as f:
text = f.read()
two_check_process_list = text.split('#####')[1].split()[0:-1]
other_process_list = text.split('#####')[2].split()
#for pname in two_check_process_list + other_process_list:
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
int(pidlist[0])
for pid in pidlist:
int(pid)
process_pid[pname] = pidlist
except:
lose_process.append(pname)
all_p = EnterShellCmd("ps -elf")
for pname in other_process_list:
findp = all_p.find(pname, 0, len(all_p))
if findp == -1:
lose_process.append(pname)
if lose_process:
PrintToLog("\n\nERROR: %s, These processes are not exist!!!\n" % lose_process)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
else:
PrintToLog("First processes check is ok\n")
try:
args.test_num.index('/')
idx_total = args.test_num.split('/')
if len(idx_total) != 2:
PrintToLog("test_num is invaild !!!")
PrintToLog("SmokeTest find some key problems!")
PrintToLog("End of check, test failed!")
sys.exit(98)
elif idx_total[1] == '1':
idx_list = list(range(1, len(all_app)))
else:
idx_list = global_pos['DEVICE_{}'.format(idx_total[0])]
except ValueError as e:
PrintToLog(e)
idx_list = list(map(eval, args.test_num.split()))
PrintToLog(idx_list)
fail_idx_list = []
fail_name_list = []
smoke_first_failed = ''
for idx in idx_list:
single_app = all_app[idx]
sys.stdout.flush()
call_app_cmd = single_app['entry']
capture_screen_cmd = "/data/screen_test/printscreen -f /data/screen_test/{}_{}"
cmp_cmd = "cmp -l /data/screen_test/{}_{} /data/screen_test/train_set/{} | wc -l"
PrintToLog("\n\n########## case {} : {} test start ##############".format(idx, single_app['app_name']))
with open(os.path.join(args.save_path, 'shot_test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_file:
cmd_file.write("\n\n::::::case {} --- {} test start \n".format(idx, single_app['app_name']))
cmd_file.close()
testcnt = 3
while testcnt:
testok = 0
checkok = 1
if testcnt != 3:
PrintToLog(">>>>>>>>>>>>>>>>>>>>>>>Try again:\n")
with open(os.path.join(args.save_path, 'shot_test_{}.bat'.format(args.device_num)),\
mode='a', encoding='utf-8') as cmd_file:
cmd_file.write("\n::::::Last failed, Try again \n")
cmd_file.close()
EnterShellCmd("rm /data/log/hilog/*;hilog -r;hilog -w start -l 400000000 -m none", 1)
if single_app['entry'] != "":
EnterShellCmd(call_app_cmd, 5)
PrintToLog(single_app['all_actions'])
raw_pic_name = ''
pic_name = ''
for single_action in single_app['all_actions']:
#shot_cmd is stable, different to other cmd,so handle it specialy
if type(single_action[1]) == str and single_action[1] == 'shot_cmd':
if len(single_action) == 3:
pic_name = "{}{}".format(single_action[2], ".png")
raw_pic_name = single_action[2] + ".pngraw"
else:
pic_name = "{}{}".format(single_app['app_name'], ".png")
raw_pic_name = single_app['app_name'] + ".pngraw"
EnterShellCmd("rm /data/screen_test/{}_{}*".format(4 - testcnt, pic_name), 1)
EnterShellCmd(capture_screen_cmd.format(4 - testcnt, pic_name), 1)
GetFileFromDev("/data/screen_test/{}_{}".format(4 - testcnt, pic_name), args.save_path)
next_cmd = ""
#cmp_cmd-level is stable, different to other cmd,so handle it specialy
elif type(single_action[1]) == str and single_action[1] == 'cmp_cmd-level':
next_cmd = ""
sys.stdout.flush()
EnterShellCmd("rm /data/train_set/{}".format(raw_pic_name), 1)
SendFileToDev(os.path.normpath(os.path.join(args.anwser_path, raw_pic_name)),\
"/data/screen_test/train_set")
new_cmp_cmd = cmp_cmd.format(4 - testcnt, raw_pic_name, raw_pic_name)
if len(single_action) == 3:
tolerance = single_action[2]
else:
tolerance = global_pos['cmp_cmd-level'][1]
p = EnterShellCmd(new_cmp_cmd, single_action[0])
#no_such = re.findall(r'No such file or directory', p)
#PrintToLog(no_such)
num = re.findall(r'[-+]?\d+', p)
PrintToLog(num)
if type(num) == list and len(num) > 0 and int(num[0]) < tolerance and\
p.find('No such file or directory', 0, len(p)) == -1:
if testok == 0:
testok = 1
PrintToLog("{} screenshot check is ok!\n\n".format(raw_pic_name))
else:
testok = -1
PrintToLog("{} screenshot check is abnarmal!\n\n".format(raw_pic_name))
sys.stdout.flush()
if testok == 1 or testcnt == 1 or smoke_first_failed != '':
old_name = os.path.normpath(os.path.join(args.save_path, "{}_{}".format(4 - testcnt, pic_name)))
GetFileFromDev("/data/screen_test/{}_{}".format(4 - testcnt, raw_pic_name), args.save_path)
os.system("rename {} {}".format(old_name, pic_name))
os.system("rename {}raw {}raw".format(old_name, pic_name))
raw_pic_name = ''
elif type(single_action[1]) == str and single_action[1] == 'install_hap':
next_cmd = ""
if len(single_action) == 3:
EnterCmd("hdc_std -t {} install \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2]))))
elif type(single_action[1]) == str and single_action[1] == 'get_file_from_dev':
next_cmd = ""
if len(single_action) == 3:
EnterCmd("hdc_std -t {} file recv \"{}\" \"{}\"".format(args.device_num,\
single_action[2], os.path.normpath(args.save_path)))
elif type(single_action[1]) == str and single_action[1] == 'send_file_to_dev':
next_cmd = ""
if len(single_action) == 4:
EnterCmd("hdc_std -t {} file send \"{}\" \"{}\"".format(args.device_num,\
os.path.normpath(os.path.join(args.tools_path, single_action[2])), single_action[3]))
elif type(single_action[1]) == str and single_action[1] == 'connect_wifi':
next_cmd = ""
ConnectToWifi(args.tools_path)
elif type(single_action[1]) == str and single_action[1] == 'process_check':
next_cmd = ""
if len(single_action) == 3:
p = EnterShellCmd("ps -elf", single_action[0])
result = "".join(p)
findsome = result.find(single_action[2], 0, len(result))
if findsome != -1:
checkok = 1
PrintToLog("\"{}\" check execut result is ok, find\
process \"{}\"!\n".format(single_action[1], single_action[2]))
else:
checkok = -1
PrintToLog("\"{}\" check execut result is not ok, not find\
process \"{}\"!\n".format(single_action[1], single_action[2]))
sys.stdout.flush()
#process_crash_check
elif type(single_action[1]) == str and single_action[1] == 'process_crash_check':
next_cmd = ""
if len(single_action) == 3:
p = EnterShellCmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./",\
single_action[0])
result = "".join(p)
findsome = result.find(single_action[2], 0, len(result))
if findsome != -1:
testok = -1
PrintToLog("\"{}\" ERROR:find fatal crash \"{}\"!\n".format(single_action[1],\
single_action[2]))
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
else:
testok = 1
PrintToLog("\"{}\" check execut result is ok, not find fatal\
crash \"{}\"!\n".format(single_action[1], single_action[2]))
sys.stdout.flush()
#other cmd handle
elif type(single_action[1]) == str:
if single_action[1] not in single_app.keys():
target_ = global_pos[single_action[1]]
else:
target_ = single_app[single_action[1]]
#this cmd is real cmd,and have a except answer
if type(target_[0]) == str:
next_cmd = ""
p = EnterShellCmd(target_[0], single_action[0])
result = "".join(p)
if len(target_) > 1:
findsome = result.find(target_[1], 0, len(result))
if findsome != -1:
testok = 1
PrintToLog("\"{}\" check execut result is ok, find \"{}\"!\n".format(target_[0],\
target_[1]))
else:
testok = -1
PrintToLog("\"{}\" check execut result is not ok, not find \"{}\"!\n".format(target_[0],\
target_[1]))
sys.stdout.flush()
#this cmd only is a name of x,y postion, to get x,y an click it
else:
next_cmd = "uinput -M -m {} {} -c 0".format(target_[0], target_[1])
#uinput x,y postion, to click it
else:
next_cmd = "uinput -M -m {} {} -c 0".format(single_action[1], single_action[2])
EnterShellCmd(next_cmd, single_action[0])
if testok == 1 and checkok == 1:
PrintToLog("testcase {}, {} is ok!\n\n".format(idx, single_app['app_name']))
testcnt = 0
elif testok == 1 and checkok == -1:
if testcnt == 1:
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
smoke_first_failed = single_app['app_name']
PrintToLog("ERROR:testcase {}, {} is failed!\n\n".format(idx, single_app['app_name']))
testcnt -= 1
elif testok == -1 and smoke_first_failed == '':
#PrintToLog("ERROR:testcase {}, {} is failed!\n\n".format(idx, single_app['app_name']))
if testcnt == 1:
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
smoke_first_failed = single_app['app_name']
PrintToLog("ERROR:testcase {}, {} is failed!\n\n".format(idx, single_app['app_name']))
testcnt -= 1
elif testok == -1 and smoke_first_failed != '':
fail_idx_list.append(idx)
fail_name_list.append(single_app['app_name'])
PrintToLog("ERROR:testcase {}, {} is failed!\n\n".format(idx, single_app['app_name']))
testcnt = 0
else:
testcnt = 0
EnterShellCmd("hilog -w stop", 1)
connection_judgment()
if smoke_first_failed == 'launcher':
break
#key processes second check, and cmp to first check
PrintToLog("\n\n########## Second check key processes start ##############")
second_check_lose_process = []
#for pname in two_check_process_list + other_process_list:
for pname in two_check_process_list:
pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
try:
pidlist = pids.split()
if process_pid[pname] != pidlist:
if pname in two_check_process_list:
PrintToLog("ERROR: pid of %s is different the first check" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
else:
PrintToLog("WARNNING: pid of %s is different the first check" % pname)
elif len(pidlist) != 1:
if pname in two_check_process_list:
PrintToLog("ERROR: pid of %s is not only one" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
else:
PrintToLog("WARNNING: pid of %s is not only one" % pname)
except:
second_check_lose_process.append(pname)
if second_check_lose_process:
PrintToLog("ERROR: pid of %s is not exist" % pname)
PrintToLog("SmokeTest find some fatal problems!")
PrintToLog("End of check, test failed!")
SysExit()
else:
PrintToLog("Second processes check is ok\n")
EnterShellCmd("cd /data/log/faultlog/temp && tar -cf after_test_crash_log_{}.tar cppcrash*".format(args.device_num))
GetFileFromDev("/data/log/faultlog/temp/after_test_crash_log_{}.tar".format(args.device_num),\
os.path.normpath(args.save_path))
EnterShellCmd("cd /data/log/faultlog/temp && find . -name cppcrash*", 2)
EnterShellCmd("cd /data/log/faultlog/temp && grep \"Process name\" -rnw ./", 2)
fail_str_list = [str(x) for x in fail_idx_list]
reboot_test_num = " ".join(fail_str_list)
print(fail_str_list)
if len(fail_idx_list) != 0:
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='a+') as f:
f.seek(0)
reboot_result = f.read()
f.close()
if len(reboot_result) < 1 and rebootcnt >= 1:
os.system("mkdir {}\\reboot".format(args.save_path))
with open(os.path.normpath(os.path.join(args.tools_path, "reboot.txt")), mode='w') as f:
f.write("reboot")
f.close()
PrintToLog("ERROR: name {}, index {}, these testcase is failed, reboot and try!!".format(fail_name_list,\
fail_idx_list))
EnterShellCmd("rm -rf /data/*;reboot")
reboot_result_list = EnterCmd("hdc_std list targets", 2)
number = 0
while args.device_num not in reboot_result_list and number < 15:
reboot_result_list = EnterCmd("hdc_std list targets", 2)
number += 1
EnterShellCmd("rm /data/log/hilog/*;hilog -r;hilog -w start -l 400000000 -m none", 1)
py_cmd = os.system("python {}\\resource\\capturescreentest.py --config\
{}\\resource\\app_capture_screen_test_config.json --anwser_path {} --save_path {}\\reboot\
--tools_path {} --device_num {} --test_num \"{}\"".format(args.tools_path, args.tools_path,\
args.anwser_path, args.save_path, args.tools_path, args.device_num, reboot_test_num))
if py_cmd == 0:
sys.exit(0)
elif py_cmd == 98:
sys.exit(98)
elif py_cmd == 99:
sys.exit(99)
else:
sys.exit(101)
else:
PrintToLog("ERROR: name {}, index {}, these testcase is failed".format(fail_name_list, fail_idx_list))
PrintToLog("SmokeTest find some key problems!")
PrintToLog("End of check, test failed!")
sys.exit(98)
else:
PrintToLog("All testcase is ok")
PrintToLog("End of check, test succeeded!")
sys.exit(0)

Binary file not shown.

After

Width:  |  Height:  |  Size: 97 KiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 149 KiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 163 KiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

File diff suppressed because one or more lines are too long

Binary file not shown.

View File

@ -0,0 +1,61 @@
####First_check_at_begin__Second_check_at_end__Two_results_must_be_same#####
com.ohos.launcher
render_service
####only_check_these_processes_are_exitst#####
hdf_devmgr
param_watcher
storage_manager
appspawn
hilogd
samgr
storage_daemon
udevd
uinput_inject
multimodalinput
huks_service
memmgrservice
bluetooth_servi
pulseaudio
accessibility
resource_schedu
bgtaskmgr_servi
audio_policy
deviceauth_service
softbus_server
wifi_hal_service
faultloggerd
accountmgr
time_service
distributeddata
useriam
inputmethod_ser
ui_service
distributedfile
netmanager
battery_stats
sensors
media_service
wifi_manager_se
distributedsche
installs
hiview
telephony
camera_service
thermal
foundation
hdcd
disp_gralloc_host
light_host
vibrator_host
sensor_host
input_user_host
camera_host
audio_host
wifi_host
usb_host
blue_host
wifi_hal_service
com.ohos.systemui
device_usage_st
power_host

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

BIN
figures/arch.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

View File

@ -0,0 +1,142 @@
[toc]
# Rom_analyzer.py
## 功能介绍
基于BUILD.gn、bundle.json、编译产物system_module_info.json、out/{product_name}/packages/phone目录下的编译产物分析各子系统及部件的rom大小。
结果以json与xls格式进行存储其中json格式是必输出的xls格式需要-e参数控制。
## 使用说明
前置条件:
1. 获取整个rom_ram_analyzer目录
1. 对系统进行编译
1. linux平台
1. python3.8及以后
1. 安装requirements
```txt
xlwt==1.3.0
```
命令介绍:
1. `-h`或`--help`命令查看帮助
```shell
> python3 rom_analyzer.py -h
usage: rom_analyzer.py [-h] [-v] -p PROJECT_PATH -j MODULE_INFO_JSON -n PRODUCT_NAME -d PRODUCT_DIR [-o OUTPUT_FILE] [-e EXCEL]
analyze rom size of component.
optional arguments:
-h, --help show this help message and exit
-v, -version show program\'s version number and exit
-p PROJECT_PATH, --project_path PROJECT_PATH
root path of openharmony. eg: -p ~/openharmony
-j MODULE_INFO_JSON, --module_info_json MODULE_INFO_JSON
path of out/{product_name}/packages/phone/system_module_info.json
-n PRODUCT_NAME, --product_name PRODUCT_NAME
product name. eg: -n rk3568
-d PRODUCT_DIR, --product_dir PRODUCT_DIR
subdirectories of out/{product_name}/packages/phone to be counted.eg: -d system -d vendor
-o OUTPUT_FILE, --output_file OUTPUT_FILE
basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result
-e EXCEL, --excel EXCEL
if output result as excel, default: False. eg: -e True
```
1. 使用示例
```shell
python3 rom_analyzer.py -p ~/nomodify_oh/ -j ../system_module_info.json -n rk3568 -d system -d vendor -d updater -o demo/demo -e True
# ohrootpath of openharmony
# rk3568: product_name, same as out/{product_name}
# demo/demo: path of output file, where the second 'demo' is the basename of output file
# -e Trueoutput result in excel format additionally
```
## 输出格式说明(json)
```json
{
子系统名: {
"size": 整个子系统输出文件的总大小,
"file_count": 整个子系统产生的文件数,
输出文件名: 本文件的大小,
...
},
...
}
```
# ram_analyzer.py
## 功能介绍
基于out/{product_name}/packages/phone下所有cfg文件、out/{product_name}/packages/phone/system/profile下所有xml文件分析各进程及对应部件的ram占用默认取Pss
结果以json与xls格式存储其中json格式是必输出的xls格式需要-e参数控制。
## 使用说明
前置条件:
1. 获取整个rom_ram_analyzer目录
2. hdc可用
2. 设备已连接
3. python3.8及以后
4. 安装requirements
```txt
xlwt==1.3.0
```
5. 准备好相关数据:
1. out/{product_name}/packages/phone下所有cfg文件并将其放置于同一个目录中ps同名文件仅保存一份即可
1. out/{product_name}/packages/phone/system/profile下所有xml文件
6. 运行rom_analyzer.py产生的json结果一份即-o参数对应的文件默认rom_analysis_result.json
命令介绍:
1. 使用`-h`或`--help`查看帮助
```shell
> python .\ram_analyzer.py -h
usage: ram_analyzer.py [-h] [-v] -x XML_PATH -c CFG_PATH [-j ROM_RESULT] -n DEVICE_NUM [-o OUTPUT_FILENAME] [-e EXCEL]
analyze ram size of component
optional arguments:
-h, --help show this help message and exit
-v, -version show program\'s version number and exit
-x XML_PATH, --xml_path XML_PATH
path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile
-c CFG_PATH, --cfg_path CFG_PATH
path of cfg files. eg: -c ./cfgs/
-j ROM_RESULT, --rom_result ROM_RESULT
json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json.eg: -j ./demo/rom_analysis_result.json
-n DEVICE_NUM, --device_num DEVICE_NUM
device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800
-o OUTPUT_FILENAME, --output_filename OUTPUT_FILENAME
base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result
-e EXCEL, --excel EXCEL
if output result as excel, default: False. eg: -e True
```
2. 使用示例:
```shell
python .\ram_analyzer.py -x .\profile\ -c .\init\ -n 7001005458323933328a01fce16d3800 -j .\rom_analysis_result.json -o /demo/demo -e True
# demo/demo: path of output file, where the second 'demo' is the basename of output file
# -e Trueoutput result in excel format additionally
```
## 输出格式说明json
```json
{
进程名:{
"size": 本进程占用内存的大小,
部件名: {
elf文件名: elf文件大小
...
}
...
},
...
}
```

View File

View File

@ -0,0 +1,32 @@
import sys
import typing
import os
from pathlib import Path
from typing import *
class BasicTool:
@classmethod
def find_all_files(cls, folder: str, real_path: bool = True, apply_abs: bool = True, de_duplicate: bool = True,
p_filter: typing.Callable = lambda x: True) -> list:
filepath_list = set()
for root, _, file_names in os.walk(folder):
filepath_list.update(
[os.path.abspath(os.path.realpath(
os.path.join(root, f) if real_path else os.path.join(root, f))) if apply_abs else os.path.relpath(
os.path.realpath(os.path.join(root, f) if real_path else os.path.join(root, f))) for f in file_names
if p_filter(os.path.join(root, f))])
if de_duplicate:
filepath_list = set(filepath_list)
filepath_list = sorted(filepath_list, key=str.lower)
return filepath_list
@classmethod
def get_abs_path(cls, path: str) -> str:
return os.path.abspath(os.path.expanduser(path))
if __name__ == '__main__':
# print(BasicTool.get_abs_path("~/git/.."))
for i in BasicTool.find_all_files(".", apply_abs=False):
print(i)

View File

@ -0,0 +1,157 @@
import os
import json
class GnCommonTool:
"""
处理BUILD.gn文件的通用方法
"""
@classmethod
def is_gn_variable(cls, target: str, has_quote: bool = True):
"""
判断target是否是gn中的变量:
规则如果是有引号的模式则没有引号均认为是变量有引号的情况下如有是"$xxx"的模式则认为xxx是变量如果是无引号模式则只要$开头就认为是变量
b = "xxx"
c = b
c = "${b}"
"$p"
"""
target = target.strip()
if not has_quote:
return target.startswith("$")
if target.startswith('"') and target.endswith('"'):
target = target.strip('"')
if target.startswith("${") and target.endswith("}"):
return True
elif target.startswith("$"):
return True
return False
else:
return True
# 给__find_variables_in_gn用的减少io
__var_val_mem_dict = dict()
@classmethod
def find_variables_in_gn(cls, var_name_tuple: tuple, path: str, stop_tail: str = "home") -> tuple:
"""
同时查找多个gn变量的值
var_name_tuple变量名的tuple变量名应是未经过处理后的
xxx
"${xxx}"
"$xxx"
"""
if os.path.isfile(path):
path = os.path.split(path)[0]
var_val_dict = dict()
not_found_count = len(var_name_tuple)
for var in var_name_tuple:
val = GnCommonTool.__var_val_mem_dict.get(var)
if val is not None:
not_found_count -= 1
var_val_dict[var] = val
while not path.endswith(stop_tail) and not_found_count != 0:
for v in var_name_tuple:
cmd = r"grep -Ern '^( *){} *= *\".*?\"' --include=*.gn* {}| grep -Ev '\$' | head -n 1 | grep -E '\".*\"' -wo".format(
v.strip('"').lstrip("${").rstrip('}'), path)
output = os.popen(cmd).read().strip().strip('"')
if len(output) != 0:
not_found_count -= 1
var_val_dict[v] = output
GnCommonTool.__var_val_mem_dict[v] = output
path = os.path.split(path)[0]
return tuple(var_val_dict.values())
@classmethod
def __find_part_subsystem_from_bundle(cls, gnpath: str, stop_tail: str = "home") -> tuple:
"""
根据BUILD.gn的全路径一层层往上面查找bundle.json文件
并从bundle.json中查找part_name和subsystem
"""
filename = "bundle.json"
part_name = None
subsystem_name = None
if stop_tail not in gnpath:
return part_name, subsystem_name
if os.path.isfile(gnpath):
gnpath = os.path.split(gnpath)[0]
while not gnpath.endswith(stop_tail):
bundle_path = os.path.join(gnpath, filename)
if not os.path.isfile(bundle_path): # 如果该文件不在该目录下
gnpath = os.path.split(gnpath)[0]
continue
with open(bundle_path, 'r', encoding='utf-8') as f:
content = json.load(f)
try:
part_name = content["component"]["name"]
subsystem_name = content["component"]["subsystem"]
except KeyError:
...
finally:
break
part_name = None if (part_name is not None and len(
part_name) == 0) else part_name
subsystem_name = None if (subsystem_name is not None and len(
subsystem_name) == 0) else subsystem_name
return part_name, subsystem_name
@classmethod
def find_part_subsystem(cls, gn_file: str, project_path: str) -> tuple:
"""
查找gn_file对应的part_name和subsystem
如果在gn中找不到就到bundle.json中去找
"""
part_name = None
subsystem_name = None
part_var_flag = False # 标识这个变量从gn中取出的原始值是不是变量
subsystem_var_flag = False
var_list = list()
part_name_pattern = r"part_name *=\s*\S*"
subsystem_pattern = r"subsystem_name *=\s*\S*"
meta_grep_pattern = "grep -E '{}' {} | head -n 1"
part_cmd = meta_grep_pattern.format(part_name_pattern, gn_file)
subsystem_cmd = meta_grep_pattern.format(subsystem_pattern, gn_file)
part = os.popen(part_cmd).read().strip()
if len(part) != 0:
part = part.split('=')[-1].strip()
if GnCommonTool.is_gn_variable(part):
part_var_flag = True
var_list.append(part)
else:
part_name = part.strip('"')
if len(part_name) == 0:
part_name = None
subsystem = os.popen(subsystem_cmd).read().strip()
if len(subsystem) != 0: # 这里是只是看有没有grep到关键字
subsystem = subsystem.split('=')[-1].strip()
if GnCommonTool.is_gn_variable(subsystem):
subsystem_var_flag = True
var_list.append(subsystem)
else:
subsystem_name = subsystem.strip('"')
if len(subsystem_name) == 0:
subsystem_name = None
if part_var_flag and subsystem_var_flag:
part_name, subsystem_name = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)
elif part_var_flag:
t = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
part_name = t if t is not None and len(t) != 0 else part_name
elif subsystem_var_flag:
t = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
subsystem_name = t if t is not None and len(
t) != 0 else subsystem_name
if part_name is not None and subsystem_name is not None:
return part_name, subsystem_name
# 如果有一个没有找到就要一层层去找bundle.json文件
t_part_name, t_subsystem_name = cls.__find_part_subsystem_from_bundle(
gn_file, stop_tail=project_path)
if t_part_name is not None:
part_name = t_part_name
if t_subsystem_name is not None:
subsystem_name = t_subsystem_name
return part_name, subsystem_name

View File

@ -0,0 +1,118 @@
import xlwt
from xlwt import Worksheet
import typing
from typing import Optional
from collections.abc import Iterable
class SimpleExcelWriter:
def __init__(self, default_sheet_name: str = "sheet1"):
self.__book = xlwt.Workbook(encoding='utf-8', style_compression=0)
self.__sheet_dict = {
default_sheet_name: self.__book.add_sheet(
sheetname=default_sheet_name, cell_overwrite_ok=True)
}
self.__sheet_pos = {
default_sheet_name: (0, 0) # 记录各个sheet页已经写到什么位置了当前值为还没有写的
}
self.__default_sheet_name = default_sheet_name
# 表头样式
self.__head_style = xlwt.XFStyle()
# 内容样式
self.__content_style = xlwt.XFStyle()
# 字体
font = xlwt.Font()
font.bold = True
# 居中对齐
alignment = xlwt.Alignment()
alignment.horz = xlwt.Alignment.HORZ_CENTER # 水平方向
alignment.vert = xlwt.Alignment.VERT_CENTER # 垂直方向
# 设置背景颜色
pattern = xlwt.Pattern()
pattern.pattern = xlwt.Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = 22 # 背景颜色
self.__head_style.font = font
self.__head_style.alignment = alignment
self.__head_style.pattern = pattern
self.__content_style.alignment = alignment
def __increment_y(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
y = y + value
self.__sheet_pos[sheet_name] = (x, y)
return y
def __increment_x(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
x = x + value
self.__sheet_pos[sheet_name] = (x, 0)
return x
def append_line(self, content: list, sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
x, y = self.__sheet_pos.get(sheet_name)
for ele in content:
sheet.write(x, y, ele, style=self.__content_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def write_merge(self, x0: int, y0: int, x1: int, y1: int, content: typing.Any,
sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
sheet.write_merge(x0, x1, y0, y1, content, style=self.__content_style)
def set_sheet_header(self, headers: Iterable, sheet_name: str = None):
"""
给sheet页设置表头
"""
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
print("error: sheet name '{}' not exist".format(sheet_name))
return
x, y = self.__sheet_pos.get(sheet_name)
if x != 0 or y != 0:
print(
"error: pos of sheet '{}' is not (0,0). set_sheet_header must before write".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
for h in headers:
sheet.write(x, y, h, self.__head_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def add_sheet(self, sheet_name: str, cell_overwrite_ok=True) -> Optional[xlwt.Worksheet]:
if sheet_name in self.__sheet_dict.keys():
print("error: sheet name '{}' has exist".format(sheet_name))
return
self.__sheet_dict[sheet_name] = self.__book.add_sheet(
sheetname=sheet_name, cell_overwrite_ok=cell_overwrite_ok)
self.__sheet_pos[sheet_name] = (0, 0)
return self.__sheet_dict.get(sheet_name)
def save(self, file_name: str):
self.__book.save(file_name)
if __name__ == '__main__':
writer = SimpleExcelWriter(default_sheet_name="first")
writer.add_sheet("second")
writer.add_sheet("third")
writer.set_sheet_header(["h", "m", "n"])
writer.append_line([1, 2, 3])
writer.append_line([2, 3, 4], "second")
writer.append_line([3, 4, 5], "third")
writer.append_line([3, 2, 1])
writer.save("demo.xls")

View File

@ -0,0 +1,427 @@
import argparse
import copy
import glob
import json
import os
import re
import sys
import subprocess
import typing
import xml.dom.minidom as dom
from packages.simple_excel_writer import SimpleExcelWriter
debug = True if sys.gettrace() else False
class HDCTool:
@classmethod
def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool:
"""
验证hdc是否可用
True可用
False不可用
"""
cp = subprocess.run(["hdc"], capture_output=True)
stdout = str(cp.stdout)
stderr = str(cp.stderr)
return verify_str in stdout or verify_str in stderr
@classmethod
def verify_device(cls, device_num: str) -> bool:
"""
验证设备是否已经连接
True已连接
False未连接
"""
cp = subprocess.run(["hdc", "list", "targets"], capture_output=True)
stdout = str(cp.stdout)
stderr = str(cp.stderr)
return device_num in stderr or device_num in stdout
__MODE = typing.Literal["stdout", "stderr"]
@classmethod
def exec(cls, args: list, output_from: __MODE = "stdout"):
cp = subprocess.run(args, capture_output=True)
if output_from == "stdout":
return cp.stdout.decode()
elif output_from == "stderr":
return cp.stderr.decode()
else:
print("error: 'output_from' must be stdout or stdin")
def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable):
for k in key_list:
if k not in target_dict.keys():
continue
del target_dict[k]
class RamAnalyzer:
@classmethod
def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]:
"""
将hidumper的拥有的数据行进行分割得到
[pid, name, pss, vss, rss, uss]格式的list
"""
trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据包括小括号)")
content = re.sub(trival_pattern, "", content)
blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)")
return re.sub(blank_pattern, ' ', content.strip()).split()
__SS_Mode = typing.Literal["Pss", "Vss", "Rss", "Uss"] # 提示输入
__ss_dict: typing.Dict[str, int] = {
"Pss": 2,
"Vss": 3,
"Rss": 4,
"Uss": 5
}
@classmethod
def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: __SS_Mode = "Pss") -> typing.Dict[
typing.Text, int]:
"""
解析hidumper --meme的结果
返回{process_name: pss}形式的字典
'248 samgr 1464(0 in SwapPss) kB 15064 kB 6928 kB 1072 kB\r'
"""
def find_full_process_name(hname: str) -> str:
for lname in __process_name_list:
if lname.startswith(hname):
return lname
def process_ps_ef(content: str) -> list:
line_list = content.strip().split("\n")[1:]
process_name_list = list()
for line in line_list:
process_name = line.split()[7]
if process_name.startswith('['):
# 内核进程
continue
process_name_list.append(process_name)
return process_name_list
if ss not in cls.__ss_dict.keys():
print("error: {} is not a valid parameter".format(ss))
return dict()
output = content.split('\n')
process_pss_dict = dict()
__process_name_list: typing.List[str] = process_ps_ef(
HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"]))
for line in output:
if "Total Memory Usage by Size" in line:
break
if line.isspace():
continue
processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(line)
if not processed[0].isnumeric(): # 如果第一列不是数字pid就过
continue
name = processed[1] # 否则的话就取名字和对应的size
size = int(processed[cls.__ss_dict.get(ss)])
process_pss_dict[find_full_process_name(name)] = size
return process_pss_dict
@classmethod
def process_hidumper_info(cls, device_num: str, ss: __SS_Mode) -> typing.Dict[str, int]:
"""
处理进程名与对应进程大小
"""
def exec_once() -> typing.Dict[str, int]:
stdout = HDCTool.exec(["hdc", "-t", device_num, "shell", "hidumper", "--mem"])
name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss)
return name_size_dict
if not HDCTool.verify_hdc():
print("error: Command 'hdc' not found")
return dict()
if not HDCTool.verify_device(device_num):
print("error: {} is inaccessible or not found".format(device_num))
return dict()
return exec_once()
@classmethod
def __parse_process_xml(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]):
"""
解析xml文件结存存入 result_dict中格式{process_name: os_list}
其中so_list中是so的base_name
"""
if not (os.path.isfile(file_path) and file_path.endswith(".xml")):
print("warning: {} not exist or not a xml file".format(file_path))
return
doc = dom.parse(file_path)
info = doc.getElementsByTagName("info")[0]
process = info.getElementsByTagName("process")[0]
process_name = process.childNodes[0].data
result_dict[process_name] = list()
libs = info.getElementsByTagName("loadlibs")[0].getElementsByTagName("libpath")
for lib in libs:
so = lib.childNodes[0].data
result_dict.get(process_name).append(os.path.split(so)[-1])
if debug:
print(process_name, " ", so)
@classmethod
def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]:
"""
利用rom_analyzer.py的分析结果重组成
{file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}}
的形式
"""
with open(rom_result_json, 'r', encoding='utf-8') as f:
rom_info_dict = json.load(f)
elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict()
for subsystem_name in rom_info_dict.keys():
sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(subsystem_name)
delete_values_from_dict(sub_val_dict, ["size", "file_count"])
for component_name in sub_val_dict.keys():
component_val_dict: typing.Dict[str, str] = sub_val_dict.get(component_name)
delete_values_from_dict(component_val_dict, ["size", "file_count"])
for file_name, size in component_val_dict.items():
file_basename: str = os.path.split(file_name)[-1]
elf_info_dict[file_basename] = {
"subsystem_name": subsystem_name,
"component_name": component_name,
"size": size
}
return elf_info_dict
@classmethod
def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict):
"""
解析cfg因为有的cfg会拉起xml中的进程所以也可能会去解析xml
"""
with open(cfg_path, 'r', encoding='utf-8') as f:
cfg_dict = json.loads(f.read())
services = cfg_dict.get("services")
if services is None:
print("warning: 'services' not in {}".format(cfg_path))
return
for service in services:
process_name = service.get("name")
first, *path_list = service.get("path")
if first.endswith("sa_main"):
# 由sa_main去来起进程
xml_base_name = os.path.split(path_list[0])[-1]
cls.__parse_process_xml(os.path.join(profile_path, xml_base_name), result_dict)
else:
# 直接执行
# process_name = os.path.split(first)[-1]
if result_dict.get(process_name) is None:
result_dict[process_name] = list()
result_dict.get(process_name).append(os.path.split(first)[-1])
@classmethod
def get_process_so_relationship(cls, xml_path: str, cfg_path: str, profile_path: str) -> typing.Dict[
str, typing.List[str]]:
"""
从out/{product_name}/packages/phone/sa_profile/merged_sa查找xml文件并处理得到进程与so的对应关系
"""
# xml_path = os.path.join(product_path, "packages", "phone", "sa_profile", "merged_sa")
# 从merged_sa里面收集
xml_list = glob.glob(xml_path + os.sep + "*[.]xml", recursive=True)
process_elf_dict: typing.Dict[str, typing.List[str]] = dict()
for xml in xml_list:
if debug:
print("parsing: ", xml)
try:
cls.__parse_process_xml(xml, process_elf_dict)
except:
print("parse '{}' failed".format(xml))
finally:
...
# 从system/etc/init/*.cfg中收集如果是sa_main拉起的则从system/profile/*.xml中进行解析
cfg_list = glob.glob(cfg_path + os.sep + "*[.]cfg", recursive=True)
for cfg in cfg_list:
if debug:
print("parsing: ", cfg)
try:
cls.__parse_process_cfg(cfg, profile_path, process_elf_dict)
except:
print("parse '{}' failed".format(cfg))
finally:
...
return process_elf_dict
@classmethod
def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: __SS_Mode):
"""
保存结果到excel中
"""
tmp_dict = copy.deepcopy(data_dict)
writer = SimpleExcelWriter("ram_info")
writer.set_sheet_header(
["process_name", "process_size({}, KB)".format(ss), "component_name", "elf_name", "elf_size(KB)"])
process_start_r = 1
process_end_r = 0
process_c = 0
process_size_c = 1
component_start_r = 1
component_end_r = 0
component_c = 2
for process_name in tmp_dict.keys():
process_val_dict: typing.Dict[str, typing.Dict[str, int]] = tmp_dict.get(process_name)
process_size = process_val_dict.get("size")
delete_values_from_dict(process_val_dict, ["size"])
for component_name, component_val_dict in process_val_dict.items():
elf_count_of_component = len(component_val_dict)
for elf_name, size in component_val_dict.items():
writer.append_line([process_name, process_size, component_name, elf_name, "%.2f" % (size / 1024)])
component_end_r += elf_count_of_component
# 重写component
writer.write_merge(component_start_r, component_c, component_end_r,
component_c, component_name)
component_start_r = component_end_r + 1
process_end_r += elf_count_of_component
writer.write_merge(process_start_r, process_c, process_end_r, process_c, process_name)
writer.write_merge(process_start_r, process_size_c, process_end_r, process_size_c, process_size)
process_start_r = process_end_r + 1
writer.save(filename)
@classmethod
def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str,
evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \
typing.Tuple[
bool, str, str, int]:
"""
全局查找进程的相关elf文件
subsystem_name与component_name可明确指定或为*以遍历整个dict
evaluator评估elf文件的从phone下面开始的路径与service_name的关系评判如何才是找到了
returns: 是否查找到elf文件名部件名size
"""
subsystem_name_list = [subsystem_name] if subsystem_name != "*" else rom_result_dict.keys()
for sn in subsystem_name_list:
sub_val_dict = rom_result_dict.get(sn)
component_name_list = [component_name] if component_name != '*' else sub_val_dict.keys()
for cn in component_name_list:
if cn == "size" or cn == "file_count":
continue
component_val_dict: typing.Dict[str, int] = sub_val_dict.get(cn)
for k, v in component_val_dict.items():
if k == "size" or k == "file_count":
continue
if not evaluator(service_name, k):
continue
return True, os.path.split(k)[-1], cn, v
return False, str(), str(), int()
@classmethod
def analysis(cls, cfg_path: str, xml_path: str, rom_result_json: str, device_num: str,
output_file: str, ss: __SS_Mode, output_excel: bool):
"""
process size subsystem/component so so_size
"""
if not HDCTool.verify_hdc():
print("error: Command 'hdc' not found")
return
if not HDCTool.verify_device(device_num):
print("error: {} is inaccessible or not found".format(device_num))
return
with open(rom_result_json, 'r', encoding='utf-8') as f:
rom_result_dict: typing.Dict = json.loads(f.read())
# 从rom的分析结果中将需要的elf信息重组
so_info_dict: typing.Dict[
str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result(
rom_result_json)
process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(xml_path, cfg_path,
profile_path)
process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(device_num, ss)
result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict()
def get(key: typing.Any, dt: typing.Dict[str, typing.Any]):
for k, v in dt.items():
if k.startswith(key) or key == v[0]:
# 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理如果service_name找不到但是直接执行的bin等于这个名字也认为找到
return v
for process_name, process_size in process_size_dict.items(): # 从进程出发
if process_name == "init":
_, bin, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init",
lambda x, y: os.path.split(y)[
-1].lower() == x.lower(),
rom_result_dict)
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name]["init"] = dict()
result_dict[process_name]["init"][bin if len(bin) != 0 else "UNKOWN"] = size
continue
# 如果是hap特殊处理
if (process_name.startswith("com.") or process_name.startswith("ohos.")):
_, hap_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*", "*",
lambda x, y: len(
y.split(
'/')) >= 3 and x.lower().startswith(
y.split('/')[2].lower()),
rom_result_dict)
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name][component_name] = dict()
result_dict[process_name][component_name][hap_name if len(hap_name) != 0 else "UNKOWN"] = size
continue
so_list: list = get(process_name, process_elf_dict) # 得到进程相关的elf文件list
if so_list is None:
print("warning: process '{}' not found in .xml or .cfg".format(process_name))
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
result_dict[process_name]["UNKOWN"] = dict()
result_dict[process_name]["UNKOWN"]["UNKOWN"] = int()
continue
result_dict[process_name] = dict()
result_dict[process_name]["size"] = process_size
for so in so_list:
unit = so_info_dict.get(so)
if unit is None:
print("warning: '{}' in {} not found in json from rom_analysis".format(so, process_name))
continue
component_name = unit.get("component_name")
so_size = unit.get("size")
if result_dict.get(process_name).get(component_name) is None:
result_dict[process_name][component_name] = dict()
result_dict[process_name][component_name][so] = so_size
base_dir, _ = os.path.split(output_file)
if len(base_dir) != 0 and not os.path.isdir(base_dir):
os.makedirs(base_dir, exist_ok=True)
with open(output_file + ".json", 'w', encoding='utf-8') as f:
f.write(json.dumps(result_dict, indent=4))
if output_excel:
cls.__save_result_as_excel(result_dict, output_file + ".xls", ss)
def get_args():
VERSION = 1.0
parser = argparse.ArgumentParser(
description="analyze ram size of component"
)
parser.add_argument("-v", "-version", action="version",
version=f"version {VERSION}")
parser.add_argument("-x", "--xml_path", type=str, required=True,
help="path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile")
parser.add_argument("-c", "--cfg_path", type=str, required=True,
help="path of cfg files. eg: -c ./cfgs/")
parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json",
help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json."
"eg: -j ./demo/rom_analysis_result.json")
parser.add_argument("-n", "--device_num", type=str, required=True,
help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
help="base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result")
parser.add_argument("-e", "--excel", type=bool, default=False,
help="if output result as excel, default: False. eg: -e True")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = get_args()
cfg_path = args.cfg_path
profile_path = args.xml_path
rom_result = args.rom_result
device_num = args.device_num
output_filename = args.output_filename
output_excel = args.excel
RamAnalyzer.analysis(cfg_path, profile_path, rom_result,
device_num=device_num, output_file=output_filename, ss="Pss", output_excel=output_excel)

View File

@ -0,0 +1,184 @@
import argparse
import json
import os
import sys
import typing
from copy import deepcopy
from typing import *
from packages.basic_tool import BasicTool
from packages.gn_common_tool import GnCommonTool
from packages.simple_excel_writer import SimpleExcelWriter
debug = bool(sys.gettrace())
class RomAnalyzer:
@classmethod
def __collect_product_info(cls, system_module_info_json: Text,
project_path: Text) -> Dict[Text, Dict[Text, Text]]:
"""
根据system_module_info.json生成target字典
"""
with open(system_module_info_json, 'r', encoding='utf-8') as f:
product_list = json.loads(f.read())
project_path = BasicTool.get_abs_path(project_path)
product_info_dict: Dict[Text, Dict[Text, Text]] = dict()
for unit in product_list:
dest: List = unit.get("dest")
if dest is None:
print("warning: keyword 'dest' not found in {}".format(system_module_info_json))
continue
label: Text = unit.get("label")
gn_path = component_name = subsystem_name = None
if label:
gn_path = os.path.join(project_path, label.split(':')[0].lstrip('/'), "BUILD.gn")
component_name, subsystem_name = GnCommonTool.find_part_subsystem(gn_path, project_path)
else:
print("warning: keyword 'label' not found in {}".format(unit))
for target in dest:
product_info_dict[target] = {
"component_name": component_name,
"subsystem_name": subsystem_name,
"gn_path": gn_path,
}
return product_info_dict
@classmethod
def __save_result_as_excel(cls, result_dict: dict, output_name: str):
header = ["subsystem_name", "component_name", "output_file", "size(Byte)"]
tmp_dict = deepcopy(result_dict)
excel_writer = SimpleExcelWriter("rom")
excel_writer.set_sheet_header(headers=header)
subsystem_start_row = 1
subsystem_end_row = 0
subsystem_col = 0
component_start_row = 1
component_end_row = 0
component_col = 1
for subsystem_name in tmp_dict.keys():
subsystem_dict = tmp_dict.get(subsystem_name)
subsystem_size = subsystem_dict.get("size")
subsystem_file_count = subsystem_dict.get("file_count")
del subsystem_dict["file_count"]
del subsystem_dict["size"]
subsystem_end_row += subsystem_file_count
for component_name in subsystem_dict.keys():
component_dict: Dict[str, int] = subsystem_dict.get(component_name)
component_size = component_dict.get("size")
component_file_count = component_dict.get("file_count")
del component_dict["file_count"]
del component_dict["size"]
component_end_row += component_file_count
for file_name, size in component_dict.items():
excel_writer.append_line(
[subsystem_name, component_name, file_name, size])
excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
component_name)
component_start_row = component_end_row + 1
excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
subsystem_name)
subsystem_start_row = subsystem_end_row + 1
excel_writer.save(output_name + ".xls")
@classmethod
def __put(cls, unit: typing.Dict[Text, Any], result_dict: typing.Dict[Text, Dict]):
"""
{
subsystem_name:{
component_name: {
file_name: file_size
}
}
}
"""
component_name = "others" if unit.get("component_name") is None else unit.get("component_name")
subsystem_name = "others" if unit.get("subsystem_name") is None else unit.get("subsystem_name")
size = unit.get("size")
relative_filepath = unit.get("relative_filepath")
if result_dict.get(subsystem_name) is None:
result_dict[subsystem_name] = dict()
result_dict[subsystem_name]["size"] = 0
result_dict[subsystem_name]["file_count"] = 0
if result_dict.get(subsystem_name).get(component_name) is None:
result_dict[subsystem_name][component_name] = dict()
result_dict[subsystem_name][component_name]["size"] = 0
result_dict[subsystem_name][component_name]["file_count"] = 0
result_dict[subsystem_name]["size"] += size
result_dict[subsystem_name]["file_count"] += 1
result_dict[subsystem_name][component_name]["size"] += size
result_dict[subsystem_name][component_name]["file_count"] += 1
result_dict[subsystem_name][component_name][relative_filepath] = size
@classmethod
def analysis(cls, system_module_info_json: Text, product_dirs: List[str],
project_path: Text, product_name: Text, output_file: Text, output_execel: bool):
"""
system_module_info_json: json文件
product_dirs要处理的产物的路径列表如["vendor", "system/"]
project_path: 项目根路径
product_name: egrk3568
output_file: basename of output file
"""
project_path = BasicTool.get_abs_path(project_path)
phone_dir = os.path.join(project_path, "out", product_name, "packages", "phone")
product_dirs = [os.path.join(phone_dir, d) for d in product_dirs]
product_info_dict = cls.__collect_product_info(system_module_info_json, project_path) # 所有产物信息
result_dict: Dict[Text:Dict] = dict()
for d in product_dirs:
file_list: List[Text] = BasicTool.find_all_files(d)
for f in file_list:
size = os.path.getsize(f)
relative_filepath = f.replace(phone_dir, "").lstrip(os.sep)
unit: Dict[Text, Any] = product_info_dict.get(relative_filepath)
if unit is None:
unit = {
"relative_filepath": relative_filepath,
}
unit["size"] = size
unit["relative_filepath"] = relative_filepath
cls.__put(unit, result_dict)
output_dir, _ = os.path.split(output_file)
if len(output_dir) != 0:
os.makedirs(output_dir, exist_ok=True)
with open(output_file + ".json", 'w', encoding='utf-8') as f:
f.write(json.dumps(result_dict, indent=4))
if output_execel:
cls.__save_result_as_excel(result_dict, output_file)
def get_args():
VERSION = 2.0
parser = argparse.ArgumentParser(
description=f"analyze rom size of component.\n")
parser.add_argument("-v", "-version", action="version",
version=f"version {VERSION}")
parser.add_argument("-p", "--project_path", type=str, required=True,
help="root path of openharmony. eg: -p ~/openharmony")
parser.add_argument("-j", "--module_info_json", required=True, type=str,
help="path of out/{product_name}/packages/phone/system_module_info.json")
parser.add_argument("-n", "--product_name", required=True, type=str, help="product name. eg: -n rk3568")
parser.add_argument("-d", "--product_dir", required=True, action="append",
help="subdirectories of out/{product_name}/packages/phone to be counted."
"eg: -d system -d vendor")
parser.add_argument("-o", "--output_file", type=str, default="rom_analysis_result",
help="basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result")
parser.add_argument("-e", "--excel", type=bool, default=False,
help="if output result as excel, default: False. eg: -e True")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = get_args()
module_info_json = args.module_info_json
project_path = args.project_path
product_name = args.product_name
product_dirs = args.product_dir
output_file = args.output_file
output_excel = args.excel
RomAnalyzer.analysis(module_info_json, product_dirs, project_path, product_name, output_file, output_excel)