support lite test framework

Signed-off-by: zhangchunbao <zhangchunbao515@163.com>
This commit is contained in:
zhangchunbao 2021-08-12 11:45:33 +08:00
parent 60f7447451
commit c240635d92
13 changed files with 751 additions and 147 deletions

View File

@ -190,23 +190,6 @@ The Python environment is required.
3. For devices that only support the serial port connection, modify the configuration file as follows:
\[board\_info\] \# Configure development board information.
```
<board_info>
<board_series>hispark</board_series>
<board_type>taurus</board_type>
<board_product>ipcamera</board_product>
<build_command>hb build</build_command>
</board_info>
```
>![](public_sys-resources/icon-note.gif) **NOTE:**
>**board\_series**: development board series. The default value is **hispark**.
>**board\_type**: development board type. The default value is **taurus**.
>**board\_product**: target product. The default value is **ipcamera**.
>**build\_command**: command used for building the test version and test case. The default value is **hb build**.
\[device\] \# Configure the serial port information with the **"ipcamera"** attribute, including the COM port and baud rate.
```
@ -377,4 +360,4 @@ The Python environment is required.
**test\_developertest**
[test\_xdevice](https://gitee.com/openharmony/test_xdevice/blob/master/README.md)
[test\_xdevice](https://gitee.com/openharmony/test_xdevice/blob/master/README.md)

View File

@ -198,24 +198,6 @@ developertest/
3. 仅支持串口的被测设备。
\[board\_info\] \# 开发板配置信息,例如:
```
<board_info>
<board_series>hispark</board_series>
<board_type>taurus</board_type>
<board_product>ipcamera</board_product>
<build_command>hb build</build_command>
</board_info>
```
>![](public_sys-resources/icon-note.gif) **说明:**
>开发板配置信息如下:
>board\_series开发板系列默认hispark。
>board\_type开发板类型默认taurus。
>board\_product目标产品默认ipcamera。
>build\_command测试版本和用例的编译命令默认hb build。
\[device\] \# 配置标签为ipcamera的串口信息COM口和波特率例如
```

View File

@ -21,16 +21,6 @@
<version>false</version>
<!-- whether compile test case, default false -->
<testcase>true</testcase>
<board_info>
<!-- board series, default hispark -->
<board_series>hispark</board_series>
<!-- board type, default taurus(3516dv300), aries(3518ev300), pegasus(3861) -->
<board_type>taurus</board_type>
<!-- product name, default ipcamera(3516dv300/3518ev300), wifiiot(3861) -->
<board_product>ipcamera</board_product>
<!-- build command, default hb build -->
<build_command>hb build</build_command>
</board_info>
</build>
<environment>
<!-- reserved field, configure devices that support HDC connection -->
@ -40,8 +30,7 @@
<sn></sn>
</device>
<!-- configure devices that support serial connection -->
<device type="com"
label="ipcamera">
<device type="com" label="ipcamera">
<serial>
<com></com>
<type>cmd</type>

View File

@ -16,6 +16,7 @@
# limitations under the License.
#
import os
import platform
import subprocess
@ -32,29 +33,75 @@ class BuildLiteManager(object):
"""
log = platform_logger("BuildLiteManager")
def __init__(self):
self.board_series = ""
self.board_type = ""
self.board_product = ""
def __init__(self, project_root_path):
self.project_rootpath = project_root_path
def build_testcases(self, param):
if platform.system() != "Linux":
self.log.info("Windows environment, only use .bin test cases")
return True
current_path = os.getcwd()
os.chdir(self.project_rootpath)
command = []
if param.productform.find("wifiiot") == -1:
command.append("hb")
command.append("build")
command.append("-p")
command.append("%s@hisilicon" % param.productform)
command.append("-b")
command.append("debug")
if param.testsuit != "":
command.append("target=%s" % param.testsuit)
else:
build_script = os.path.abspath(os.path.join(
os.path.dirname(__file__),
"build_lite_testcases.sh"))
print("build_script=%s" % build_script)
command.append(build_script)
command.append("product=%s" % param.productform)
command.append("kernel=liteos_m")
if param.testsuit != "":
command.append("target=%s" % param.testsuit)
self.log.info("build_command: %s" % str(command))
def build_version_and_cases(self):
build_command = "hb build -b debug"
self.log.info("build param:%s" % build_command)
build_result = False
try:
build_result = subprocess.call(build_command) == 0
build_result = subprocess.call(command) == 0
except IOError as exception:
self.log.error("build test case failed, exception=%s" % exception)
if build_result:
self.log.info("build test case successed.")
else:
self.log.info("build test case failed.")
os.chdir(os.path.realpath(current_path))
return build_result
def exec_build_test(self, param_option):
"""
build os lite version and test cases
:param param_option: build param
:return:build success or failed
"""
if platform.system() == "Linux":
return self.build_version_and_cases()
self.log.info("windows environment, only use .bin test cases")
return True
def build_version(self, productform):
current_path = os.getcwd()
os.chdir(self.project_rootpath)
command = []
command.append("hb")
command.append("build")
command.append("-p")
command.append("%s@hisilicon" % productform)
command.append("-f")
self.log.info("build_command: %s" % str(command))
build_result = False
try:
build_result = subprocess.call(command) == 0
except IOError as exception:
self.log.error("build version failed, exception=%s" % exception)
if build_result:
self.log.info("build version successed.")
else:
self.log.info("build version failed.")
os.chdir(os.path.realpath(current_path))
return build_result

View File

@ -0,0 +1,159 @@
#!/bin/bash
# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
date +%F' '%H:%M:%S
echo $@
BASE_HOME=$(dirname $(dirname $(dirname $(dirname $(dirname $(cd $(dirname $0); pwd))))))
PRODUCT=""
PLATFORM=""
TARGET=""
KERNEL=""
DEVICECOMPANY="hisilicon"
BUILD_OUTFILE=*_allinone.bin
DIST_DIR=$BASE_HOME/dist
BUILD_TARGETS=""
#BUILD_TARGETS="${BUILD_TARGETS},//utils/native/lite/test/kv_store_hal:KvStoreTest"
usage()
{
echo
echo "USAGE"
echo " ./build_testcase.sh product=PRODUCT [kernel=KERNEL] [devicecompany=DEVICECOMPANY] [target=TARGET]"
echo
echo " product : PRODUCT product name, such as wifiiot_hispark_pegasus"
echo " target : TARGET the target for build, such as //utils/native/lite/test:liteos_m_unittest."
echo " kernel : KERNEL the target for kernel, such as liteos_m"
echo " devicecompany : DEVICECOMPANY the devicecompany for build, such as hisilicon"
echo
exit 1
}
parse_parameter()
{
while [ -n "$1" ]
do
var="$1"
OPTIONS=$(echo ${var%%=*})
PARAM=$(echo ${var#*=})
case "$OPTIONS" in
product) PRODUCT="$PARAM"
;;
target) TARGET="$PARAM"
;;
kernel) KERNEL="$PARAM"
;;
devicecompany) DEVICECOMPANY="$PARAM"
;;
*) usage
break;;
esac
shift
done
if [ "$KERNEL" = "" ];then
echo "kernel is required, for kernel $KERNEL"
usage
fi
if [ "$PRODUCT" = "" ];then
echo "product is required, for product $PRODUCT"
usage
fi
}
build_testcases()
{
cd $BASE_HOME
if [ "$KERNEL" = "liteos_m" ]; then
board_name=${PRODUCT#*_}
out_dir="${BASE_HOME}/out/${board_name}/${PRODUCT}"
suite_root_dir="${out_dir}/suites"
if [ "${BUILD_TARGETS}" != "" ];then
build_liteos_m_targets "acts" $BUILD_TARGETS
else
echo "BUILD_TARGETS=${BUILD_TARGETS}"
fi
if [ -d "$DIST_DIR" ];then
rm -rf $DIST_DIR
fi
fi
}
build_liteos_m_targets()
{
current_name=$1
current_target=$2
tdd_root_dir="${suite_root_dir}/${current_name}"
suite_out_dir="${tdd_root_dir}/testcases"
suite_out_zip="${tdd_root_dir}.zip"
mkdir -p $DIST_DIR
IFS=',' read -r -a array <<< "${current_target}"
echo "--------------------------------------------${array[@]}"
set -e
mkdir -p ${DIST_DIR}/json
for element in ${array[*]}
do
echo "python build.py -p ${PRODUCT}@${DEVICECOMPANY} -f -t xts ${element}"
python build.py -p ${PRODUCT}@${DEVICECOMPANY} -f -t xts ${element}
suite_build_target=$(echo "${element}" | awk -F "[/:]" '{print $NF}')
echo "suite_build_target=${suite_build_target}"
module_list_file=$suite_out_dir/module_info.json
echo "module_list_file=${module_list_file}"
suite_module_name=$(python test/xts/tools/lite/build/utils.py --method_name get_modulename_by_buildtarget --arguments module_list_file=${module_list_file}#build_target=${suite_build_target})
echo "suite_module_name=${suite_module_name}"
subsystem_name=$(python test/xts/tools/lite/build/utils.py --method_name get_subsystem_name --arguments path=${element})
echo "subsystem_name=${subsystem_name}"
python test/xts/tools/lite/build/utils.py --method_name record_testmodule_info --arguments build_target_name=${suite_module_name}#module_name=${suite_module_name}#subsystem_name=${subsystem_name}#suite_out_dir=${DIST_DIR}/json#same_file=True
mkdir -p ${suite_out_dir}/${subsystem_name}
cp -rf ${BASE_HOME}/out/${board_name}/${PRODUCT}/${BUILD_OUTFILE} ${suite_out_dir}/${subsystem_name}/${suite_module_name}.bin
if [ -f "${suite_out_dir}/${subsystem_name}/*.a" ];then
rm -f ${suite_out_dir}/${subsystem_name}/*.a
fi
cp -rf ${tdd_root_dir} ${DIST_DIR}
done
cp -rf ${DIST_DIR}/${current_name} ${suite_root_dir}
rm -rf ${suite_out_dir}/.bin
cp -rf ${DIST_DIR}/json/module_info.json ${suite_out_dir}
cd $suite_root_dir
if [ -f "${suite_out_zip}" ];then
rm -f ${suite_out_zip}
fi
zip -rv ${suite_out_zip} ${current_name}
mkdir -p ${out_dir}/test
mv ${tdd_root_dir}/testcases/module_info.json ${out_dir}/test/
mv ${tdd_root_dir}/testcases/test_component.json ${out_dir}/test/
mv ${tdd_root_dir}/testcases ${out_dir}/test/unittest
cd $BASE_HOME
}
echo $BASE_HOME
parse_parameter $@
build_testcases
date +%F' '%H:%M:%S
echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"

View File

@ -69,22 +69,12 @@ class BuildManager(object):
LOG.info("Test case compilation failed, please modify.")
return build_result
@classmethod
def _compile_all_test_cases(cls, project_root_path):
if BuildTestcases(project_root_path).build_all_testcases():
LOG.info("Test case compilation successed.")
build_result = True
else:
LOG.info("Test case compilation failed, please modify.")
build_result = False
return build_result
def _compile_testcases(self, project_root_path, para):
all_product_list = scan_support_product()
if para.productform not in all_product_list:
from core.build.build_lite_manager import BuildLiteManager
build_lite_manager = BuildLiteManager()
return build_lite_manager.exec_build_test(para)
build_lite_manager = BuildLiteManager(project_root_path)
return build_lite_manager.build_testcases(para)
if para.testsuit != "":
return self._compile_test_cases_by_target(

View File

@ -159,49 +159,26 @@ class BuildTestcases(object):
build_result = False
current_path = os.getcwd()
os.chdir(self.project_rootpath)
command.append("--product-name")
command.append(productform)
command.append("--export-para")
command.append("PYCACHE_ENABLE:true")
# scan standard and large system
all_scan_product_list = scan_support_product()
if productform in all_scan_product_list:
if os.path.exists(BUILD_FILEPATH):
build_command = [BUILD_FILEPATH]
build_command.extend(command)
LOG.info("build_command: %s" % str(build_command))
if subprocess.call(build_command) == 0:
build_result = True
else:
build_result = False
if os.path.exists(BUILD_FILEPATH):
build_command = [BUILD_FILEPATH]
build_command.extend(command)
LOG.info("build_command: %s" % str(build_command))
if subprocess.call(build_command) == 0:
build_result = True
else:
LOG.warning("Error: The %s is not exist" % BUILD_FILEPATH)
build_result = False
else:
build_result = self._execute_build_lite_cmd()
LOG.warning("Error: The %s is not exist" % BUILD_FILEPATH)
os.chdir(current_path)
return build_result
def _execute_build_lite_cmd(self):
if os.path.exists(BUILD_LITE):
temp_user_manager = UserConfigManager()
build_command_config = \
temp_user_manager.get_user_config("build", "board_info")
build_command = [build_command_config.get("build_command", "")]
LOG.info("build_command: %s" % str(build_command))
try:
if subprocess.call(build_command) == 0:
LOG.info("execute build lite command success")
return True
else:
LOG.error("execute build lite command success")
except IOError as exception:
LOG.error("build lite test case failed, exception=%s"
% exception)
else:
LOG.warning("Error: The %s is not exist" % BUILD_LITE)
return False
def build_fuzz_testcases(self, para):
self._delete_testcase_dir(para.productform)
helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py")
@ -246,17 +223,6 @@ class BuildTestcases(object):
command.append(BUILD_TARGET_PLATFORM % productform)
return self._execute_build_command(productform, command)
def build_all_testcases(self):
command = []
if self.is_build_example:
command.append("--gn-args")
command.append("build_example=true")
command.append("--build-target")
command.append("make_test")
command.append("--gn-args")
command.append(BUILD_TARGET_PLATFORM % "all")
return self._execute_build_command(command)
##############################################################################
##############################################################################

View File

@ -30,6 +30,7 @@ from core.command.display import display_help_info
from core.command.display import display_show_info
from core.command.display import show_wizard_mode
from core.config.config_manager import UserConfigManager
from core.utils import is_lite_product
try:
if platform.system() != 'Windows':
@ -238,17 +239,19 @@ class Console(object):
LOG.warning("action is empty.")
return
if "productform" in self.wizard_dic.keys():
productform = self.wizard_dic["productform"]
options.productform = productform
else:
productform = options.productform
if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP):
self._process_command_help(para_list)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW):
if "productform" in self.wizard_dic:
productform = self.wizard_dic["productform"]
self._process_command_show(para_list, productform)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN):
self._process_command_gen(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN):
if "productform" in self.wizard_dic:
options.productform = self.wizard_dic["productform"]
self._process_command_run(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT):
self._process_command_quit(command)
@ -314,13 +317,23 @@ class Console(object):
@classmethod
def _build_version(cls, product_form):
build_result = True
is_build_version = UserConfigManager().get_user_config_flag(
"build", "version")
project_root_path = sys.source_code_root_path
if project_root_path != "":
if project_root_path == "":
return True
build_result = True
if is_lite_product(product_form, sys.source_code_root_path):
if not is_build_version:
return True
from core.build.build_lite_manager import BuildLiteManager
build_lite_manager = BuildLiteManager(project_root_path)
build_result = build_lite_manager.build_version(product_form)
else:
from core.build.build_manager import BuildManager
build_manager = BuildManager()
is_build_version = UserConfigManager().get_user_config_flag(
"build", "version")
if is_build_version:
build_result = build_manager.build_version(project_root_path,
product_form)

View File

@ -25,8 +25,10 @@ from xdevice import Plugin
from xdevice import get_plugin
from xdevice import platform_logger
from xdevice import Scheduler
from xdevice import DeviceTestType
from core.utils import get_build_output_path
from core.utils import scan_support_product
from core.utils import is_lite_product
from core.common import is_open_source_product
from core.command.parameter import Parameter
from core.testcase.testcase_manager import TestCaseManager
@ -132,6 +134,20 @@ class Run(object):
if scheduler is None:
LOG.error("Can not find the scheduler plugin.")
else:
if is_lite_product(options.productform,
sys.source_code_root_path):
options.testcases_path = options.target_outpath
options.resource_path = os.path.abspath(os.path.join(
sys.framework_root_dir, "..", "resource"))
print(options.testcases_path)
print(options.resource_path)
if options.productform.find("wifiiot") != -1:
scheduler.update_test_type_in_source(".bin",
DeviceTestType.ctest_lite)
scheduler.update_ext_type_in_source("BIN",
DeviceTestType.ctest_lite)
else:
print("productform is not wifiiot")
scheduler.exec_command(command, options)
return
@ -178,26 +194,27 @@ class Run(object):
@classmethod
def get_tests_out_path(cls, product_form):
tests_out_path = UserConfigManager().get_test_cases_dir()
if tests_out_path == "":
testcase_path = UserConfigManager().get_test_cases_dir()
if testcase_path == "":
all_product_list = scan_support_product()
if product_form in all_product_list:
if is_open_source_product(product_form):
tests_out_path = os.path.abspath(os.path.join(
testcase_path = os.path.abspath(os.path.join(
get_build_output_path(product_form),
"packages",
"phone",
"tests"))
else:
tests_out_path = os.path.abspath(os.path.join(
testcase_path = os.path.abspath(os.path.join(
get_build_output_path(product_form),
"packages",
product_form,
"tests"))
else:
tests_out_path = os.path.join(
testcase_path = os.path.join(
get_build_output_path(product_form), "test")
return tests_out_path
LOG.info("testcase_path=%s" % testcase_path)
return testcase_path
@classmethod
def get_coverage_outpath(cls, options):

View File

@ -25,10 +25,28 @@ from xdevice import DeviceTestType
from xdevice import IDriver
from xdevice import Plugin
from xdevice import platform_logger
from xdevice import DeviceLabelType
from xdevice import ComType
from xdevice import ParserType
from xdevice import ShellHandler
from xdevice import ExecuteTerminate
from xdevice import LiteDeviceExecuteCommandError
from xdevice import get_plugin
from xdevice import JsonParser
from xdevice import get_config_value
from xdevice import get_kit_instances
from xdevice import check_result_report
from xdevice import get_device_log_file
from xdevice import get_test_component_version
from xdevice import ParamError
from core.utils import get_filename_extension
from core.testkit.kit_lite import DeployKit
from core.config.config_manager import UserConfigManager
__all__ = ["LiteUnitTest"]
__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"]
LOG = platform_logger("LiteUnitTest")
def get_level_para_string(level_string):
@ -113,14 +131,14 @@ class LiteUnitTest(IDriver):
if self.mnt_cmd == "mount ":
self.log.error("no configure for mount command")
return
filter_result, status, _ = \
self.lite_device.execute_command_with_timeout(
self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3)
if "already mounted" in filter_result:
self.log.info("nfs has been mounted")
return
for i in range(0, 2):
if status:
self.log.info("execute mount command success")
@ -266,3 +284,266 @@ class LiteUnitTest(IDriver):
def __result__(self):
pass
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
class CTestDriver(IDriver):
"""
CTest is a test that runs a native test package on given lite device.
"""
config = None
result = ""
error_message = ""
version_cmd = "AT+CSV"
def __init__(self):
self.file_name = ""
def __check_environment__(self, device_options):
if len(device_options) != 1 or \
device_options[0].label != DeviceLabelType.wifiiot:
self.error_message = "check environment failed"
return False
return True
def __check_config__(self, config=None):
del config
self.config = None
def __execute__(self, request):
from xdevice import Variables
try:
self.config = request.config
self.config.device = request.config.environment.devices[0]
if request.config.resource_path:
current_dir = request.config.resource_path
else:
current_dir = Variables.exec_dir
config_file = request.root.source.config_file.strip()
if config_file:
source = os.path.join(current_dir, config_file)
self.file_name = os.path.basename(config_file).split(".")[0]
else:
source = request.root.source.source_string.strip()
self._run_ctest(source=source, request=request)
except (LiteDeviceExecuteCommandError, Exception) as exception:
LOG.error(exception, error_no=getattr(exception, "error_no",
"00000"))
self.error_message = exception
finally:
if request.root.source.test_name.startswith("{"):
report_name = "report"
else:
report_name = get_filename_extension(
request.root.source.test_name)[0]
self.result = check_result_report(request.config.report_path,
self.result,
self.error_message,
report_name)
def _run_ctest(self, source=None, request=None):
if not source:
LOG.error("Error: %s don't exist." % source, error_no="00101")
return
try:
parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
version = get_test_component_version(self.config)
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.file_name
parser_instance.product_info.setdefault("Version", version)
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
reset_cmd = self._reset_device(request, source)
self.result = "%s.xml" % os.path.join(request.config.report_path,
"result", self.file_name)
self.config.device.device.com_dict.get(
ComType.deploy_com).connect()
result, _, error = self.config.device.device. \
execute_command_with_timeout(
command=reset_cmd,
case_type=DeviceTestType.ctest_lite,
key=ComType.deploy_com,
timeout=90,
receiver=handler)
device_log_file = get_device_log_file(request.config.report_path,
request.config.device.
__get_serial__())
device_log_file_open = os.open(device_log_file, os.O_WRONLY |
os.O_CREAT | os.O_APPEND, 0o755)
with os.fdopen(device_log_file_open, "a") as file_name:
file_name.write("{}{}".format(
"\n".join(result.split("\n")[0:-1]), "\n"))
file_name.flush()
finally:
self.config.device.device.com_dict.get(ComType.deploy_com).close()
def _reset_device(self, request, source):
json_config = JsonParser(source)
reset_cmd = []
kit_instances = get_kit_instances(json_config,
request.config.resource_path,
request.config.testcases_path)
from xdevice import Scheduler
for (kit_instance, kit_info) in zip(kit_instances,
json_config.get_kits()):
if not isinstance(kit_instance, DeployKit):
continue
if not self.file_name:
self.file_name = get_config_value(
'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
reset_cmd = kit_instance.burn_command
if not Scheduler.is_execute:
raise ExecuteTerminate("ExecuteTerminate", error_no="00300")
kit_instance.__setup__(self.config.device,
source_file=request.root.source.source_file.strip())
reset_cmd = [int(item, 16) for item in reset_cmd]
return reset_cmd
def __result__(self):
return self.result if os.path.exists(self.result) else ""
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
class JSUnitTestLiteDriver(IDriver):
"""
JSUnitTestDriver is a Test that runs a native test package on given device.
"""
def __init__(self):
self.result = ""
self.error_message = ""
self.kits = []
self.config = None
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def _get_driver_config(self, json_config):
bundle_name = get_config_value('bundle-name',
json_config.get_driver(), False)
if not bundle_name:
raise ParamError("Can't find bundle-name in config file.",
error_no="00108")
else:
self.config.bundle_name = bundle_name
ability = get_config_value('ability',
json_config.get_driver(), False)
if not ability:
self.config.ability = "default"
else:
self.config.ability = ability
def __execute__(self, request):
try:
LOG.debug("Start execute xdevice extension JSUnit Test")
self.config = request.config
self.config.device = request.config.environment.devices[0]
config_file = request.root.source.config_file
suite_file = request.root.source.source_file
if not suite_file:
raise ParamError(
"test source '%s' not exists" %
request.root.source.source_string, error_no="00101")
if not os.path.exists(config_file):
LOG.error("Error: Test cases don't exist %s." % config_file,
error_no="00101")
raise ParamError(
"Error: Test cases don't exist %s." % config_file,
error_no="00101")
self.file_name = os.path.basename(
request.root.source.source_file.strip()).split(".")[0]
self.result = "%s.xml" % os.path.join(
request.config.report_path, "result", self.file_name)
json_config = JsonParser(config_file)
self.kits = get_kit_instances(json_config,
self.config.resource_path,
self.config.testcases_path)
self._get_driver_config(json_config)
from xdevice import Scheduler
for kit in self.kits:
if not Scheduler.is_execute:
raise ExecuteTerminate("ExecuteTerminate",
error_no="00300")
if kit.__class__.__name__ == CKit.liteinstall:
kit.bundle_name = self.config.bundle_name
kit.__setup__(self.config.device, request=request)
self._run_jsunit(request)
except Exception as exception:
self.error_message = exception
finally:
report_name = "report" if request.root.source. \
test_name.startswith("{") else get_filename_extension(
request.root.source.test_name)[0]
self.result = check_result_report(
request.config.report_path, self.result, self.error_message,
report_name)
for kit in self.kits:
kit.__teardown__(self.config.device)
self.config.device.close()
def _run_jsunit(self, request):
parser_instances = []
parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.file_name
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
command = "./bin/aa start -p %s -n %s" % \
(self.config.bundle_name, self.config.ability)
result, _, error = self.config.device.execute_command_with_timeout(
command=command,
timeout=300,
receiver=handler)
device_log_file = get_device_log_file(request.config.report_path,
request.config.device.
__get_serial__())
device_log_file_open = os.open(device_log_file, os.O_WRONLY |
os.O_CREAT | os.O_APPEND, 0o755)
with os.fdopen(device_log_file_open, "a") as file_name:
file_name.write("{}{}".format(
"\n".join(result.split("\n")[0:-1]), "\n"))
file_name.flush()
def __result__(self):
return self.result if os.path.exists(self.result) else ""

17
src/core/testkit/__init__.py Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

146
src/core/testkit/kit_lite.py Executable file
View File

@ -0,0 +1,146 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
import random
import string
import shutil
import subprocess
from xdevice import platform_logger
from xdevice import Plugin
from xdevice import CKit
from xdevice import ComType
from xdevice import ParamError
from xdevice import LiteDeviceError
from xdevice import ITestKit
from xdevice import get_config_value
from xdevice import get_file_absolute_path
from xdevice import LiteDeviceConnectError
from xdevice import DeviceAllocationState
__all__ = ["DeployKit"]
LOG = platform_logger("KitLite")
RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, " \
"0x61, 0x94"
@Plugin(type=Plugin.TEST_KIT, id=CKit.deploy)
class DeployKit(ITestKit):
def __init__(self):
self.burn_file = ""
self.burn_command = ""
self.timeout = ""
self.paths = ""
def __check_config__(self, config):
self.timeout = str(int(get_config_value(
'timeout', config, is_list=False, default=0)) * 1000)
self.burn_file = get_config_value('burn_file', config, is_list=False)
burn_command = get_config_value('burn_command', config, is_list=False,
default=RESET_CMD)
self.burn_command = burn_command.replace(" ", "").split(",")
self.paths = get_config_value('paths', config)
if self.timeout == "0" or not self.burn_file:
msg = "The config for deploy kit is invalid with timeout:{}, " \
"burn_file:{}".format(self.timeout, self.burn_file)
raise ParamError(msg, error_no="00108")
def _reset(self, device):
cmd_com = device.device.com_dict.get(ComType.cmd_com)
try:
cmd_com.connect()
cmd_com.execute_command(command='AT+RST={}'.format(self.timeout))
cmd_com.close()
except (LiteDeviceConnectError, IOError) as error:
device.device_allocation_state = DeviceAllocationState.unusable
LOG.error(
"The exception {} happened in deploy kit running".format(
error), error_no=getattr(error, "error_no",
"00000"))
raise LiteDeviceError("%s port set_up wifiiot failed" %
cmd_com.serial_port,
error_no=getattr(error, "error_no",
"00000"))
finally:
if cmd_com:
cmd_com.close()
def _send_file(self, device, source_file):
burn_tool_name = "HiBurn.exe" if os.name == "nt" else "HiBurn"
burn_tool_path = get_file_absolute_path(
os.path.join("tools", burn_tool_name), self.paths)
deploy_serial_port = device.device.com_dict.get(
ComType.deploy_com).serial_port
deploy_baudrate = device.device.com_dict.\
get(ComType.deploy_com).baud_rate
port_number = re.findall(r'\d+$', deploy_serial_port)
if not port_number:
raise LiteDeviceError("The config of serial port {} to deploy is "
"invalid".format(deploy_serial_port),
error_no="00108")
new_temp_tool_path = self.copy_file_as_temp(burn_tool_path, 10)
cmd = '{} -com:{} -bin:{} -signalbaud:{}' \
.format(new_temp_tool_path, port_number[0], source_file,
deploy_baudrate)
LOG.info('The running cmd is {}'.format(cmd))
LOG.info('The burn tool is running, please wait..')
return_code, out = subprocess.getstatusoutput(cmd)
LOG.info(
'Deploy kit to execute burn tool finished with return_code: {} '
'output: {}'.format(return_code, out))
os.remove(new_temp_tool_path)
if 0 != return_code:
device.device_allocation_state = DeviceAllocationState.unusable
raise LiteDeviceError("%s port set_up wifiiot failed" %
deploy_serial_port, error_no="00402")
def __setup__(self, device, **kwargs):
"""
Execute reset command on the device by cmd serial port and then upload
patch file by deploy tool.
Parameters:
device: the instance of LocalController with one or more
ComController
"""
args = kwargs
source_file = args.get("source_file", None)
self._reset(device)
self._send_file(device, source_file)
def __teardown__(self, device):
pass
def copy_file_as_temp(self, original_file, str_length):
"""
To obtain a random string with specified length
Parameters:
original_file : the original file path
str_length: the length of random string
"""
if os.path.isfile(original_file):
random_str = random.sample(string.ascii_letters + string.digits,
str_length)
new_temp_tool_path = '{}_{}{}'.format(
os.path.splitext(original_file)[0], "".join(random_str),
os.path.splitext(original_file)[1])
return shutil.copyfile(original_file, new_temp_tool_path)

View File

@ -74,9 +74,10 @@ def get_device_log_file(report_path, serial=None, log_name="device_log"):
def get_build_output_path(product_form):
if sys.source_code_root_path == "":
return ""
standard_large_system_list = scan_support_product()
property_info = parse_product_info(product_form)
if product_form in standard_large_system_list:
property_info = parse_product_info(product_form)
if property_info is not None:
target_os = property_info.get("target_os")
target_cpu = property_info.get("target_cpu")
@ -84,14 +85,15 @@ def get_build_output_path(product_form):
else:
return ""
else:
para_dic = UserConfigManager().get_user_config("build", "board_info")
board_series = para_dic.get("board_series", "")
board_type = para_dic.get("board_type", "")
board_product = para_dic.get("board_product", "")
fist_build_output = "%s_%s" % (board_series, board_type)
second_build_output = "%s_%s" % (board_product, fist_build_output)
build_output_name = os.path.join(fist_build_output,
board_info_list = product_form.split("_")
if len(board_info_list) < 3:
return ""
first_build_output = board_info_list[1] + "_" + board_info_list[2]
second_build_output = product_form
build_output_name = os.path.join(first_build_output,
second_build_output)
build_output_path = os.path.join(sys.source_code_root_path,
"out",
build_output_name)
@ -126,6 +128,9 @@ def parse_product_info(product_form):
product_form,
"preloader",
"build.prop")
if not os.path.exists(build_prop):
return {}
with open(build_prop, 'r') as pro_file:
properties = {}
for line in pro_file:
@ -154,6 +159,7 @@ def get_decode(stream):
ret = str(stream)
return ret
def parse_fuzzer_info():
path_list = []
bin_list = []
@ -170,9 +176,17 @@ def parse_fuzzer_info():
bin_list.append(striped_str.split(":")[1].split("(")[0])
return path_list, bin_list
def get_fuzzer_path(filename):
path_list, bin_list = parse_fuzzer_info()
for i, name in enumerate(bin_list):
if name == filename:
return os.path.join(sys.source_code_root_path, path_list[i])
return ""
def is_lite_product(product_form, code_root_path):
if code_root_path is None or code_root_path == "":
return True if len(product_form.split("_")) >= 3 else False
else:
return True if product_form not in scan_support_product() else False