From c240635d92ee4925a4ad50ce4af74fdb60b3c646 Mon Sep 17 00:00:00 2001 From: zhangchunbao Date: Thu, 12 Aug 2021 11:45:33 +0800 Subject: [PATCH] support lite test framework Signed-off-by: zhangchunbao --- README.md | 19 +- README_zh.md | 18 -- config/user_config.xml | 13 +- src/core/build/build_lite_manager.py | 83 +++++-- src/core/build/build_lite_testcases.sh | 159 ++++++++++++++ src/core/build/build_manager.py | 14 +- src/core/build/build_testcases.py | 54 +---- src/core/command/console.py | 29 ++- src/core/command/run.py | 29 ++- src/core/driver/lite_driver.py | 287 ++++++++++++++++++++++++- src/core/testkit/__init__.py | 17 ++ src/core/testkit/kit_lite.py | 146 +++++++++++++ src/core/utils.py | 30 ++- 13 files changed, 751 insertions(+), 147 deletions(-) create mode 100755 src/core/build/build_lite_testcases.sh create mode 100755 src/core/testkit/__init__.py create mode 100755 src/core/testkit/kit_lite.py diff --git a/README.md b/README.md index c2bcff8..d3fecc0 100755 --- a/README.md +++ b/README.md @@ -190,23 +190,6 @@ The Python environment is required. 3. For devices that only support the serial port connection, modify the configuration file as follows: - \[board\_info\] \# Configure development board information. - - ``` - - hispark - taurus - ipcamera - hb build - - ``` - - >![](public_sys-resources/icon-note.gif) **NOTE:** - >**board\_series**: development board series. The default value is **hispark**. - >**board\_type**: development board type. The default value is **taurus**. - >**board\_product**: target product. The default value is **ipcamera**. - >**build\_command**: command used for building the test version and test case. The default value is **hb build**. - \[device\] \# Configure the serial port information with the **"ipcamera"** attribute, including the COM port and baud rate. ``` @@ -377,4 +360,4 @@ The Python environment is required. **test\_developertest** -[test\_xdevice](https://gitee.com/openharmony/test_xdevice/blob/master/README.md) \ No newline at end of file +[test\_xdevice](https://gitee.com/openharmony/test_xdevice/blob/master/README.md) diff --git a/README_zh.md b/README_zh.md index 0586b66..c166984 100755 --- a/README_zh.md +++ b/README_zh.md @@ -198,24 +198,6 @@ developertest/ 3. 仅支持串口的被测设备。 - \[board\_info\] \# 开发板配置信息,例如: - - ``` - - hispark - taurus - ipcamera - hb build - - ``` - - >![](public_sys-resources/icon-note.gif) **说明:** - >开发板配置信息如下: - >board\_series:开发板系列,默认hispark。 - >board\_type:开发板类型,默认taurus。 - >board\_product:目标产品,默认ipcamera。 - >build\_command:测试版本和用例的编译命令,默认hb build。 - \[device\] \# 配置标签为ipcamera的串口信息,COM口和波特率,例如: ``` diff --git a/config/user_config.xml b/config/user_config.xml index c9d2ac6..8434962 100755 --- a/config/user_config.xml +++ b/config/user_config.xml @@ -21,16 +21,6 @@ false true - - - hispark - - taurus - - ipcamera - - hb build - @@ -40,8 +30,7 @@ - + cmd diff --git a/src/core/build/build_lite_manager.py b/src/core/build/build_lite_manager.py index 53758e7..c50272a 100755 --- a/src/core/build/build_lite_manager.py +++ b/src/core/build/build_lite_manager.py @@ -16,6 +16,7 @@ # limitations under the License. # +import os import platform import subprocess @@ -32,29 +33,75 @@ class BuildLiteManager(object): """ log = platform_logger("BuildLiteManager") - def __init__(self): - self.board_series = "" - self.board_type = "" - self.board_product = "" + def __init__(self, project_root_path): + self.project_rootpath = project_root_path + + def build_testcases(self, param): + if platform.system() != "Linux": + self.log.info("Windows environment, only use .bin test cases") + return True + + current_path = os.getcwd() + os.chdir(self.project_rootpath) + + command = [] + if param.productform.find("wifiiot") == -1: + command.append("hb") + command.append("build") + command.append("-p") + command.append("%s@hisilicon" % param.productform) + command.append("-b") + command.append("debug") + if param.testsuit != "": + command.append("target=%s" % param.testsuit) + else: + build_script = os.path.abspath(os.path.join( + os.path.dirname(__file__), + "build_lite_testcases.sh")) + print("build_script=%s" % build_script) + command.append(build_script) + command.append("product=%s" % param.productform) + command.append("kernel=liteos_m") + if param.testsuit != "": + command.append("target=%s" % param.testsuit) + self.log.info("build_command: %s" % str(command)) - def build_version_and_cases(self): - build_command = "hb build -b debug" - self.log.info("build param:%s" % build_command) build_result = False try: - build_result = subprocess.call(build_command) == 0 + build_result = subprocess.call(command) == 0 except IOError as exception: self.log.error("build test case failed, exception=%s" % exception) + + if build_result: + self.log.info("build test case successed.") + else: + self.log.info("build test case failed.") + + os.chdir(os.path.realpath(current_path)) return build_result - def exec_build_test(self, param_option): - """ - build os lite version and test cases - :param param_option: build param - :return:build success or failed - """ - if platform.system() == "Linux": - return self.build_version_and_cases() - self.log.info("windows environment, only use .bin test cases") - return True + def build_version(self, productform): + current_path = os.getcwd() + os.chdir(self.project_rootpath) + command = [] + command.append("hb") + command.append("build") + command.append("-p") + command.append("%s@hisilicon" % productform) + command.append("-f") + self.log.info("build_command: %s" % str(command)) + + build_result = False + try: + build_result = subprocess.call(command) == 0 + except IOError as exception: + self.log.error("build version failed, exception=%s" % exception) + + if build_result: + self.log.info("build version successed.") + else: + self.log.info("build version failed.") + + os.chdir(os.path.realpath(current_path)) + return build_result diff --git a/src/core/build/build_lite_testcases.sh b/src/core/build/build_lite_testcases.sh new file mode 100755 index 0000000..0aec256 --- /dev/null +++ b/src/core/build/build_lite_testcases.sh @@ -0,0 +1,159 @@ +#!/bin/bash +# Copyright (c) 2020-2021 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" +date +%F' '%H:%M:%S +echo $@ + +BASE_HOME=$(dirname $(dirname $(dirname $(dirname $(dirname $(cd $(dirname $0); pwd)))))) +PRODUCT="" +PLATFORM="" +TARGET="" +KERNEL="" +DEVICECOMPANY="hisilicon" +BUILD_OUTFILE=*_allinone.bin +DIST_DIR=$BASE_HOME/dist + +BUILD_TARGETS="" +#BUILD_TARGETS="${BUILD_TARGETS},//utils/native/lite/test/kv_store_hal:KvStoreTest" + + +usage() +{ + echo + echo "USAGE" + echo " ./build_testcase.sh product=PRODUCT [kernel=KERNEL] [devicecompany=DEVICECOMPANY] [target=TARGET]" + echo + echo " product : PRODUCT product name, such as wifiiot_hispark_pegasus" + echo " target : TARGET the target for build, such as //utils/native/lite/test:liteos_m_unittest." + echo " kernel : KERNEL the target for kernel, such as liteos_m" + echo " devicecompany : DEVICECOMPANY the devicecompany for build, such as hisilicon" + echo + exit 1 +} + +parse_parameter() +{ + while [ -n "$1" ] + do + var="$1" + OPTIONS=$(echo ${var%%=*}) + PARAM=$(echo ${var#*=}) + case "$OPTIONS" in + product) PRODUCT="$PARAM" + ;; + target) TARGET="$PARAM" + ;; + kernel) KERNEL="$PARAM" + ;; + devicecompany) DEVICECOMPANY="$PARAM" + ;; + *) usage + break;; + esac + shift + done + + if [ "$KERNEL" = "" ];then + echo "kernel is required, for kernel $KERNEL" + usage + fi + + if [ "$PRODUCT" = "" ];then + echo "product is required, for product $PRODUCT" + usage + fi +} + + +build_testcases() +{ + cd $BASE_HOME + if [ "$KERNEL" = "liteos_m" ]; then + board_name=${PRODUCT#*_} + out_dir="${BASE_HOME}/out/${board_name}/${PRODUCT}" + suite_root_dir="${out_dir}/suites" + if [ "${BUILD_TARGETS}" != "" ];then + build_liteos_m_targets "acts" $BUILD_TARGETS + else + echo "BUILD_TARGETS=${BUILD_TARGETS}" + fi + if [ -d "$DIST_DIR" ];then + rm -rf $DIST_DIR + fi + fi +} + + +build_liteos_m_targets() +{ + current_name=$1 + current_target=$2 + tdd_root_dir="${suite_root_dir}/${current_name}" + suite_out_dir="${tdd_root_dir}/testcases" + suite_out_zip="${tdd_root_dir}.zip" + + mkdir -p $DIST_DIR + IFS=',' read -r -a array <<< "${current_target}" + echo "--------------------------------------------${array[@]}" + set -e + mkdir -p ${DIST_DIR}/json + + for element in ${array[*]} + do + echo "python build.py -p ${PRODUCT}@${DEVICECOMPANY} -f -t xts ${element}" + python build.py -p ${PRODUCT}@${DEVICECOMPANY} -f -t xts ${element} + + suite_build_target=$(echo "${element}" | awk -F "[/:]" '{print $NF}') + echo "suite_build_target=${suite_build_target}" + module_list_file=$suite_out_dir/module_info.json + echo "module_list_file=${module_list_file}" + suite_module_name=$(python test/xts/tools/lite/build/utils.py --method_name get_modulename_by_buildtarget --arguments module_list_file=${module_list_file}#build_target=${suite_build_target}) + echo "suite_module_name=${suite_module_name}" + subsystem_name=$(python test/xts/tools/lite/build/utils.py --method_name get_subsystem_name --arguments path=${element}) + echo "subsystem_name=${subsystem_name}" + python test/xts/tools/lite/build/utils.py --method_name record_testmodule_info --arguments build_target_name=${suite_module_name}#module_name=${suite_module_name}#subsystem_name=${subsystem_name}#suite_out_dir=${DIST_DIR}/json#same_file=True + + mkdir -p ${suite_out_dir}/${subsystem_name} + cp -rf ${BASE_HOME}/out/${board_name}/${PRODUCT}/${BUILD_OUTFILE} ${suite_out_dir}/${subsystem_name}/${suite_module_name}.bin + if [ -f "${suite_out_dir}/${subsystem_name}/*.a" ];then + rm -f ${suite_out_dir}/${subsystem_name}/*.a + fi + cp -rf ${tdd_root_dir} ${DIST_DIR} + done + + cp -rf ${DIST_DIR}/${current_name} ${suite_root_dir} + rm -rf ${suite_out_dir}/.bin + cp -rf ${DIST_DIR}/json/module_info.json ${suite_out_dir} + cd $suite_root_dir + if [ -f "${suite_out_zip}" ];then + rm -f ${suite_out_zip} + fi + zip -rv ${suite_out_zip} ${current_name} + + mkdir -p ${out_dir}/test + mv ${tdd_root_dir}/testcases/module_info.json ${out_dir}/test/ + mv ${tdd_root_dir}/testcases/test_component.json ${out_dir}/test/ + mv ${tdd_root_dir}/testcases ${out_dir}/test/unittest + cd $BASE_HOME +} + +echo $BASE_HOME +parse_parameter $@ +build_testcases + +date +%F' '%H:%M:%S +echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" diff --git a/src/core/build/build_manager.py b/src/core/build/build_manager.py index 29ed810..0419255 100755 --- a/src/core/build/build_manager.py +++ b/src/core/build/build_manager.py @@ -69,22 +69,12 @@ class BuildManager(object): LOG.info("Test case compilation failed, please modify.") return build_result - @classmethod - def _compile_all_test_cases(cls, project_root_path): - if BuildTestcases(project_root_path).build_all_testcases(): - LOG.info("Test case compilation successed.") - build_result = True - else: - LOG.info("Test case compilation failed, please modify.") - build_result = False - return build_result - def _compile_testcases(self, project_root_path, para): all_product_list = scan_support_product() if para.productform not in all_product_list: from core.build.build_lite_manager import BuildLiteManager - build_lite_manager = BuildLiteManager() - return build_lite_manager.exec_build_test(para) + build_lite_manager = BuildLiteManager(project_root_path) + return build_lite_manager.build_testcases(para) if para.testsuit != "": return self._compile_test_cases_by_target( diff --git a/src/core/build/build_testcases.py b/src/core/build/build_testcases.py index 97caf14..929e4d2 100755 --- a/src/core/build/build_testcases.py +++ b/src/core/build/build_testcases.py @@ -159,49 +159,26 @@ class BuildTestcases(object): build_result = False current_path = os.getcwd() os.chdir(self.project_rootpath) + command.append("--product-name") command.append(productform) command.append("--export-para") command.append("PYCACHE_ENABLE:true") - # scan standard and large system - all_scan_product_list = scan_support_product() - if productform in all_scan_product_list: - if os.path.exists(BUILD_FILEPATH): - build_command = [BUILD_FILEPATH] - build_command.extend(command) - LOG.info("build_command: %s" % str(build_command)) - if subprocess.call(build_command) == 0: - build_result = True - else: - build_result = False + if os.path.exists(BUILD_FILEPATH): + build_command = [BUILD_FILEPATH] + build_command.extend(command) + LOG.info("build_command: %s" % str(build_command)) + if subprocess.call(build_command) == 0: + build_result = True else: - LOG.warning("Error: The %s is not exist" % BUILD_FILEPATH) + build_result = False else: - build_result = self._execute_build_lite_cmd() + LOG.warning("Error: The %s is not exist" % BUILD_FILEPATH) + os.chdir(current_path) return build_result - def _execute_build_lite_cmd(self): - if os.path.exists(BUILD_LITE): - temp_user_manager = UserConfigManager() - build_command_config = \ - temp_user_manager.get_user_config("build", "board_info") - build_command = [build_command_config.get("build_command", "")] - LOG.info("build_command: %s" % str(build_command)) - try: - if subprocess.call(build_command) == 0: - LOG.info("execute build lite command success") - return True - else: - LOG.error("execute build lite command success") - except IOError as exception: - LOG.error("build lite test case failed, exception=%s" - % exception) - else: - LOG.warning("Error: The %s is not exist" % BUILD_LITE) - return False - def build_fuzz_testcases(self, para): self._delete_testcase_dir(para.productform) helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py") @@ -246,17 +223,6 @@ class BuildTestcases(object): command.append(BUILD_TARGET_PLATFORM % productform) return self._execute_build_command(productform, command) - def build_all_testcases(self): - command = [] - if self.is_build_example: - command.append("--gn-args") - command.append("build_example=true") - command.append("--build-target") - command.append("make_test") - command.append("--gn-args") - command.append(BUILD_TARGET_PLATFORM % "all") - return self._execute_build_command(command) - ############################################################################## ############################################################################## diff --git a/src/core/command/console.py b/src/core/command/console.py index 8c88f88..a8b3822 100755 --- a/src/core/command/console.py +++ b/src/core/command/console.py @@ -30,6 +30,7 @@ from core.command.display import display_help_info from core.command.display import display_show_info from core.command.display import show_wizard_mode from core.config.config_manager import UserConfigManager +from core.utils import is_lite_product try: if platform.system() != 'Windows': @@ -238,17 +239,19 @@ class Console(object): LOG.warning("action is empty.") return + if "productform" in self.wizard_dic.keys(): + productform = self.wizard_dic["productform"] + options.productform = productform + else: + productform = options.productform + if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP): self._process_command_help(para_list) elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW): - if "productform" in self.wizard_dic: - productform = self.wizard_dic["productform"] self._process_command_show(para_list, productform) elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN): self._process_command_gen(command, options) elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN): - if "productform" in self.wizard_dic: - options.productform = self.wizard_dic["productform"] self._process_command_run(command, options) elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT): self._process_command_quit(command) @@ -314,13 +317,23 @@ class Console(object): @classmethod def _build_version(cls, product_form): - build_result = True + is_build_version = UserConfigManager().get_user_config_flag( + "build", "version") + project_root_path = sys.source_code_root_path - if project_root_path != "": + if project_root_path == "": + return True + + build_result = True + if is_lite_product(product_form, sys.source_code_root_path): + if not is_build_version: + return True + from core.build.build_lite_manager import BuildLiteManager + build_lite_manager = BuildLiteManager(project_root_path) + build_result = build_lite_manager.build_version(product_form) + else: from core.build.build_manager import BuildManager build_manager = BuildManager() - is_build_version = UserConfigManager().get_user_config_flag( - "build", "version") if is_build_version: build_result = build_manager.build_version(project_root_path, product_form) diff --git a/src/core/command/run.py b/src/core/command/run.py index b8bc9fd..8b5cb5d 100755 --- a/src/core/command/run.py +++ b/src/core/command/run.py @@ -25,8 +25,10 @@ from xdevice import Plugin from xdevice import get_plugin from xdevice import platform_logger from xdevice import Scheduler +from xdevice import DeviceTestType from core.utils import get_build_output_path from core.utils import scan_support_product +from core.utils import is_lite_product from core.common import is_open_source_product from core.command.parameter import Parameter from core.testcase.testcase_manager import TestCaseManager @@ -132,6 +134,20 @@ class Run(object): if scheduler is None: LOG.error("Can not find the scheduler plugin.") else: + if is_lite_product(options.productform, + sys.source_code_root_path): + options.testcases_path = options.target_outpath + options.resource_path = os.path.abspath(os.path.join( + sys.framework_root_dir, "..", "resource")) + print(options.testcases_path) + print(options.resource_path) + if options.productform.find("wifiiot") != -1: + scheduler.update_test_type_in_source(".bin", + DeviceTestType.ctest_lite) + scheduler.update_ext_type_in_source("BIN", + DeviceTestType.ctest_lite) + else: + print("productform is not wifiiot") scheduler.exec_command(command, options) return @@ -178,26 +194,27 @@ class Run(object): @classmethod def get_tests_out_path(cls, product_form): - tests_out_path = UserConfigManager().get_test_cases_dir() - if tests_out_path == "": + testcase_path = UserConfigManager().get_test_cases_dir() + if testcase_path == "": all_product_list = scan_support_product() if product_form in all_product_list: if is_open_source_product(product_form): - tests_out_path = os.path.abspath(os.path.join( + testcase_path = os.path.abspath(os.path.join( get_build_output_path(product_form), "packages", "phone", "tests")) else: - tests_out_path = os.path.abspath(os.path.join( + testcase_path = os.path.abspath(os.path.join( get_build_output_path(product_form), "packages", product_form, "tests")) else: - tests_out_path = os.path.join( + testcase_path = os.path.join( get_build_output_path(product_form), "test") - return tests_out_path + LOG.info("testcase_path=%s" % testcase_path) + return testcase_path @classmethod def get_coverage_outpath(cls, options): diff --git a/src/core/driver/lite_driver.py b/src/core/driver/lite_driver.py index 22cb561..859ac58 100755 --- a/src/core/driver/lite_driver.py +++ b/src/core/driver/lite_driver.py @@ -25,10 +25,28 @@ from xdevice import DeviceTestType from xdevice import IDriver from xdevice import Plugin from xdevice import platform_logger +from xdevice import DeviceLabelType +from xdevice import ComType +from xdevice import ParserType +from xdevice import ShellHandler +from xdevice import ExecuteTerminate +from xdevice import LiteDeviceExecuteCommandError +from xdevice import get_plugin +from xdevice import JsonParser +from xdevice import get_config_value +from xdevice import get_kit_instances +from xdevice import check_result_report +from xdevice import get_device_log_file +from xdevice import get_test_component_version +from xdevice import ParamError +from core.utils import get_filename_extension +from core.testkit.kit_lite import DeployKit + from core.config.config_manager import UserConfigManager -__all__ = ["LiteUnitTest"] +__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"] +LOG = platform_logger("LiteUnitTest") def get_level_para_string(level_string): @@ -113,14 +131,14 @@ class LiteUnitTest(IDriver): if self.mnt_cmd == "mount ": self.log.error("no configure for mount command") return - + filter_result, status, _ = \ self.lite_device.execute_command_with_timeout( self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3) if "already mounted" in filter_result: self.log.info("nfs has been mounted") return - + for i in range(0, 2): if status: self.log.info("execute mount command success") @@ -266,3 +284,266 @@ class LiteUnitTest(IDriver): def __result__(self): pass + + +@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) +class CTestDriver(IDriver): + """ + CTest is a test that runs a native test package on given lite device. + """ + config = None + result = "" + error_message = "" + version_cmd = "AT+CSV" + + def __init__(self): + self.file_name = "" + + def __check_environment__(self, device_options): + if len(device_options) != 1 or \ + device_options[0].label != DeviceLabelType.wifiiot: + self.error_message = "check environment failed" + return False + return True + + def __check_config__(self, config=None): + del config + self.config = None + + def __execute__(self, request): + from xdevice import Variables + try: + self.config = request.config + self.config.device = request.config.environment.devices[0] + + if request.config.resource_path: + current_dir = request.config.resource_path + else: + current_dir = Variables.exec_dir + + config_file = request.root.source.config_file.strip() + if config_file: + source = os.path.join(current_dir, config_file) + self.file_name = os.path.basename(config_file).split(".")[0] + else: + source = request.root.source.source_string.strip() + + self._run_ctest(source=source, request=request) + + except (LiteDeviceExecuteCommandError, Exception) as exception: + LOG.error(exception, error_no=getattr(exception, "error_no", + "00000")) + self.error_message = exception + finally: + if request.root.source.test_name.startswith("{"): + report_name = "report" + else: + report_name = get_filename_extension( + request.root.source.test_name)[0] + + self.result = check_result_report(request.config.report_path, + self.result, + self.error_message, + report_name) + + def _run_ctest(self, source=None, request=None): + if not source: + LOG.error("Error: %s don't exist." % source, error_no="00101") + return + + try: + parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) + version = get_test_component_version(self.config) + parser_instances = [] + for parser in parsers: + parser_instance = parser.__class__() + parser_instance.suites_name = self.file_name + parser_instance.product_info.setdefault("Version", version) + parser_instance.listeners = request.listeners + parser_instances.append(parser_instance) + handler = ShellHandler(parser_instances) + + reset_cmd = self._reset_device(request, source) + self.result = "%s.xml" % os.path.join(request.config.report_path, + "result", self.file_name) + + self.config.device.device.com_dict.get( + ComType.deploy_com).connect() + + result, _, error = self.config.device.device. \ + execute_command_with_timeout( + command=reset_cmd, + case_type=DeviceTestType.ctest_lite, + key=ComType.deploy_com, + timeout=90, + receiver=handler) + + device_log_file = get_device_log_file(request.config.report_path, + request.config.device. + __get_serial__()) + + device_log_file_open = os.open(device_log_file, os.O_WRONLY | + os.O_CREAT | os.O_APPEND, 0o755) + with os.fdopen(device_log_file_open, "a") as file_name: + file_name.write("{}{}".format( + "\n".join(result.split("\n")[0:-1]), "\n")) + file_name.flush() + finally: + self.config.device.device.com_dict.get(ComType.deploy_com).close() + + def _reset_device(self, request, source): + json_config = JsonParser(source) + reset_cmd = [] + kit_instances = get_kit_instances(json_config, + request.config.resource_path, + request.config.testcases_path) + from xdevice import Scheduler + + for (kit_instance, kit_info) in zip(kit_instances, + json_config.get_kits()): + if not isinstance(kit_instance, DeployKit): + continue + if not self.file_name: + self.file_name = get_config_value( + 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] + reset_cmd = kit_instance.burn_command + if not Scheduler.is_execute: + raise ExecuteTerminate("ExecuteTerminate", error_no="00300") + + kit_instance.__setup__(self.config.device, + source_file=request.root.source.source_file.strip()) + + reset_cmd = [int(item, 16) for item in reset_cmd] + return reset_cmd + + def __result__(self): + return self.result if os.path.exists(self.result) else "" + + + +@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) +class JSUnitTestLiteDriver(IDriver): + """ + JSUnitTestDriver is a Test that runs a native test package on given device. + """ + + def __init__(self): + self.result = "" + self.error_message = "" + self.kits = [] + self.config = None + + def __check_environment__(self, device_options): + pass + + def __check_config__(self, config): + pass + + def _get_driver_config(self, json_config): + bundle_name = get_config_value('bundle-name', + json_config.get_driver(), False) + if not bundle_name: + raise ParamError("Can't find bundle-name in config file.", + error_no="00108") + else: + self.config.bundle_name = bundle_name + + ability = get_config_value('ability', + json_config.get_driver(), False) + if not ability: + self.config.ability = "default" + else: + self.config.ability = ability + + def __execute__(self, request): + try: + LOG.debug("Start execute xdevice extension JSUnit Test") + + self.config = request.config + self.config.device = request.config.environment.devices[0] + + config_file = request.root.source.config_file + suite_file = request.root.source.source_file + + if not suite_file: + raise ParamError( + "test source '%s' not exists" % + request.root.source.source_string, error_no="00101") + + if not os.path.exists(config_file): + LOG.error("Error: Test cases don't exist %s." % config_file, + error_no="00101") + raise ParamError( + "Error: Test cases don't exist %s." % config_file, + error_no="00101") + + self.file_name = os.path.basename( + request.root.source.source_file.strip()).split(".")[0] + + self.result = "%s.xml" % os.path.join( + request.config.report_path, "result", self.file_name) + + json_config = JsonParser(config_file) + self.kits = get_kit_instances(json_config, + self.config.resource_path, + self.config.testcases_path) + + self._get_driver_config(json_config) + + from xdevice import Scheduler + for kit in self.kits: + if not Scheduler.is_execute: + raise ExecuteTerminate("ExecuteTerminate", + error_no="00300") + if kit.__class__.__name__ == CKit.liteinstall: + kit.bundle_name = self.config.bundle_name + kit.__setup__(self.config.device, request=request) + + self._run_jsunit(request) + + except Exception as exception: + self.error_message = exception + finally: + report_name = "report" if request.root.source. \ + test_name.startswith("{") else get_filename_extension( + request.root.source.test_name)[0] + + self.result = check_result_report( + request.config.report_path, self.result, self.error_message, + report_name) + + for kit in self.kits: + kit.__teardown__(self.config.device) + + self.config.device.close() + + def _run_jsunit(self, request): + parser_instances = [] + parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) + for parser in parsers: + parser_instance = parser.__class__() + parser_instance.suites_name = self.file_name + parser_instance.listeners = request.listeners + parser_instances.append(parser_instance) + handler = ShellHandler(parser_instances) + + command = "./bin/aa start -p %s -n %s" % \ + (self.config.bundle_name, self.config.ability) + result, _, error = self.config.device.execute_command_with_timeout( + command=command, + timeout=300, + receiver=handler) + + device_log_file = get_device_log_file(request.config.report_path, + request.config.device. + __get_serial__()) + + device_log_file_open = os.open(device_log_file, os.O_WRONLY | + os.O_CREAT | os.O_APPEND, 0o755) + with os.fdopen(device_log_file_open, "a") as file_name: + file_name.write("{}{}".format( + "\n".join(result.split("\n")[0:-1]), "\n")) + file_name.flush() + + def __result__(self): + return self.result if os.path.exists(self.result) else "" \ No newline at end of file diff --git a/src/core/testkit/__init__.py b/src/core/testkit/__init__.py new file mode 100755 index 0000000..a9c4807 --- /dev/null +++ b/src/core/testkit/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +# coding=utf-8 + +# +# Copyright (c) 2020 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/src/core/testkit/kit_lite.py b/src/core/testkit/kit_lite.py new file mode 100755 index 0000000..21967b7 --- /dev/null +++ b/src/core/testkit/kit_lite.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +# coding=utf-8 + +# +# Copyright (c) 2020-2021 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import re +import random +import string +import shutil +import subprocess + +from xdevice import platform_logger +from xdevice import Plugin +from xdevice import CKit +from xdevice import ComType +from xdevice import ParamError +from xdevice import LiteDeviceError +from xdevice import ITestKit +from xdevice import get_config_value +from xdevice import get_file_absolute_path +from xdevice import LiteDeviceConnectError +from xdevice import DeviceAllocationState + + +__all__ = ["DeployKit"] +LOG = platform_logger("KitLite") + +RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, " \ + "0x61, 0x94" + + +@Plugin(type=Plugin.TEST_KIT, id=CKit.deploy) +class DeployKit(ITestKit): + def __init__(self): + self.burn_file = "" + self.burn_command = "" + self.timeout = "" + self.paths = "" + + def __check_config__(self, config): + self.timeout = str(int(get_config_value( + 'timeout', config, is_list=False, default=0)) * 1000) + self.burn_file = get_config_value('burn_file', config, is_list=False) + burn_command = get_config_value('burn_command', config, is_list=False, + default=RESET_CMD) + self.burn_command = burn_command.replace(" ", "").split(",") + self.paths = get_config_value('paths', config) + if self.timeout == "0" or not self.burn_file: + msg = "The config for deploy kit is invalid with timeout:{}, " \ + "burn_file:{}".format(self.timeout, self.burn_file) + raise ParamError(msg, error_no="00108") + + def _reset(self, device): + cmd_com = device.device.com_dict.get(ComType.cmd_com) + try: + cmd_com.connect() + cmd_com.execute_command(command='AT+RST={}'.format(self.timeout)) + cmd_com.close() + except (LiteDeviceConnectError, IOError) as error: + device.device_allocation_state = DeviceAllocationState.unusable + LOG.error( + "The exception {} happened in deploy kit running".format( + error), error_no=getattr(error, "error_no", + "00000")) + raise LiteDeviceError("%s port set_up wifiiot failed" % + cmd_com.serial_port, + error_no=getattr(error, "error_no", + "00000")) + finally: + if cmd_com: + cmd_com.close() + + def _send_file(self, device, source_file): + burn_tool_name = "HiBurn.exe" if os.name == "nt" else "HiBurn" + burn_tool_path = get_file_absolute_path( + os.path.join("tools", burn_tool_name), self.paths) + + deploy_serial_port = device.device.com_dict.get( + ComType.deploy_com).serial_port + deploy_baudrate = device.device.com_dict.\ + get(ComType.deploy_com).baud_rate + port_number = re.findall(r'\d+$', deploy_serial_port) + if not port_number: + raise LiteDeviceError("The config of serial port {} to deploy is " + "invalid".format(deploy_serial_port), + error_no="00108") + new_temp_tool_path = self.copy_file_as_temp(burn_tool_path, 10) + cmd = '{} -com:{} -bin:{} -signalbaud:{}' \ + .format(new_temp_tool_path, port_number[0], source_file, + deploy_baudrate) + LOG.info('The running cmd is {}'.format(cmd)) + LOG.info('The burn tool is running, please wait..') + return_code, out = subprocess.getstatusoutput(cmd) + LOG.info( + 'Deploy kit to execute burn tool finished with return_code: {} ' + 'output: {}'.format(return_code, out)) + os.remove(new_temp_tool_path) + if 0 != return_code: + device.device_allocation_state = DeviceAllocationState.unusable + raise LiteDeviceError("%s port set_up wifiiot failed" % + deploy_serial_port, error_no="00402") + + def __setup__(self, device, **kwargs): + """ + Execute reset command on the device by cmd serial port and then upload + patch file by deploy tool. + Parameters: + device: the instance of LocalController with one or more + ComController + """ + args = kwargs + source_file = args.get("source_file", None) + self._reset(device) + self._send_file(device, source_file) + + def __teardown__(self, device): + pass + + def copy_file_as_temp(self, original_file, str_length): + """ + To obtain a random string with specified length + Parameters: + original_file : the original file path + str_length: the length of random string + """ + if os.path.isfile(original_file): + random_str = random.sample(string.ascii_letters + string.digits, + str_length) + new_temp_tool_path = '{}_{}{}'.format( + os.path.splitext(original_file)[0], "".join(random_str), + os.path.splitext(original_file)[1]) + return shutil.copyfile(original_file, new_temp_tool_path) \ No newline at end of file diff --git a/src/core/utils.py b/src/core/utils.py index 02f086f..482c42c 100755 --- a/src/core/utils.py +++ b/src/core/utils.py @@ -74,9 +74,10 @@ def get_device_log_file(report_path, serial=None, log_name="device_log"): def get_build_output_path(product_form): if sys.source_code_root_path == "": return "" + standard_large_system_list = scan_support_product() - property_info = parse_product_info(product_form) if product_form in standard_large_system_list: + property_info = parse_product_info(product_form) if property_info is not None: target_os = property_info.get("target_os") target_cpu = property_info.get("target_cpu") @@ -84,14 +85,15 @@ def get_build_output_path(product_form): else: return "" else: - para_dic = UserConfigManager().get_user_config("build", "board_info") - board_series = para_dic.get("board_series", "") - board_type = para_dic.get("board_type", "") - board_product = para_dic.get("board_product", "") - fist_build_output = "%s_%s" % (board_series, board_type) - second_build_output = "%s_%s" % (board_product, fist_build_output) - build_output_name = os.path.join(fist_build_output, + board_info_list = product_form.split("_") + if len(board_info_list) < 3: + return "" + + first_build_output = board_info_list[1] + "_" + board_info_list[2] + second_build_output = product_form + build_output_name = os.path.join(first_build_output, second_build_output) + build_output_path = os.path.join(sys.source_code_root_path, "out", build_output_name) @@ -126,6 +128,9 @@ def parse_product_info(product_form): product_form, "preloader", "build.prop") + if not os.path.exists(build_prop): + return {} + with open(build_prop, 'r') as pro_file: properties = {} for line in pro_file: @@ -154,6 +159,7 @@ def get_decode(stream): ret = str(stream) return ret + def parse_fuzzer_info(): path_list = [] bin_list = [] @@ -170,9 +176,17 @@ def parse_fuzzer_info(): bin_list.append(striped_str.split(":")[1].split("(")[0]) return path_list, bin_list + def get_fuzzer_path(filename): path_list, bin_list = parse_fuzzer_info() for i, name in enumerate(bin_list): if name == filename: return os.path.join(sys.source_code_root_path, path_list[i]) return "" + + +def is_lite_product(product_form, code_root_path): + if code_root_path is None or code_root_path == "": + return True if len(product_form.split("_")) >= 3 else False + else: + return True if product_form not in scan_support_product() else False