From 0fee664fec6a295770258b265c596f86581f3526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=9B=BD=E8=BE=89?= Date: Tue, 2 Jul 2024 09:05:51 +0000 Subject: [PATCH] =?UTF-8?q?Signed-off-by:=20=E9=BB=84=E5=9B=BD=E8=BE=89=20?= =?UTF-8?q??= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 黄国辉 --- aw/python/distributed/common/devices.py | 79 +-- aw/python/distributed/common/manager.py | 34 +- .../distributed/distribute/distribute.py | 204 +++--- libs/benchmark/report/benchmark_reporter.py | 1 + libs/benchmark/report/generate_report.py | 38 +- libs/fuzzlib/fuzzer_helper.py | 2 +- libs/fuzzlib/tools/colored.py | 44 +- libs/fuzzlib/tools/run_result.py | 6 +- libs/fuzzlib/tools/templates.py | 2 +- .../codeCoverage/mutilProcess_CodeCoverage.py | 2 +- .../interfaceCoverage_gcov_lcov.py | 3 +- .../interfaceCoverage/make_report.py | 23 +- .../keyword_registration/keyword_filter.py | 617 +++++++++--------- src/core/build/build_manager.py | 131 ++-- src/core/build/build_testcases.py | 206 +++--- src/core/build/select_targets.py | 97 ++- src/core/command/console.py | 168 ++--- src/core/command/display.py | 1 - src/core/command/gen.py | 29 +- src/core/command/run.py | 544 ++++++++------- src/core/config/config_manager.py | 38 +- src/core/config/resource_manager.py | 237 +++---- src/core/driver/drivers.py | 286 ++++---- src/core/driver/lite_driver.py | 81 +-- src/core/driver/openharmony.py | 47 +- src/core/testcase/testcase_manager.py | 262 ++++---- src/core/testkit/kit_lite.py | 66 +- 27 files changed, 1620 insertions(+), 1628 deletions(-) diff --git a/aw/python/distributed/common/devices.py b/aw/python/distributed/common/devices.py index e0443f3..a823586 100755 --- a/aw/python/distributed/common/devices.py +++ b/aw/python/distributed/common/devices.py @@ -122,6 +122,45 @@ class DeviceShell: return device_para + @classmethod + def execute_command(cls, command, print_flag=True, timeout=900): + try: + if print_flag: + print("command: " + command) + if subprocess.call(command, shell=True, timeout=timeout) == 0: + print("results: successed") + return True + except Exception as error: + print("Exception: %s" % str(error)) + print("results: failed") + return False + + @classmethod + def execute_command_with_output(cls, command, print_flag=True): + if print_flag: + print("command: " + command) + + proc = subprocess.Popen(command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) + + result = "" + try: + data, _ = proc.communicate() + if isinstance(data, bytes): + result = data.decode('utf-8', 'ignore') + finally: + proc.stdout.close() + proc.stderr.close() + return result if result else data + + @classmethod + def check_path_legal(cls, path): + if path and " " in path: + return "\"%s\"" % path + return path + def remount(self): if self.conn_type: remount = "target mount" @@ -185,19 +224,6 @@ class DeviceShell: command, QUOTATION_MARKS)) - @classmethod - def execute_command(cls, command, print_flag=True, timeout=900): - try: - if print_flag: - print("command: " + command) - if subprocess.call(command, shell=True, timeout=timeout) == 0: - print("results: successed") - return True - except Exception as error: - print("Exception: %s" % str(error)) - print("results: failed") - return False - def shell_with_output(self, command=""): return self.execute_command_with_output("%s %s shell %s%s%s" % ( HDC_TOOLS, @@ -205,29 +231,4 @@ class DeviceShell: QUOTATION_MARKS, command, QUOTATION_MARKS)) - - @classmethod - def execute_command_with_output(cls, command, print_flag=True): - if print_flag: - print("command: " + command) - - proc = subprocess.Popen(command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True) - - result = "" - try: - data, _ = proc.communicate() - if isinstance(data, bytes): - result = data.decode('utf-8', 'ignore') - finally: - proc.stdout.close() - proc.stderr.close() - return result if result else data - - @classmethod - def check_path_legal(cls, path): - if path and " " in path: - return "\"%s\"" % path - return path \ No newline at end of file + \ No newline at end of file diff --git a/aw/python/distributed/common/manager.py b/aw/python/distributed/common/manager.py index ef58315..3fd8cc8 100755 --- a/aw/python/distributed/common/manager.py +++ b/aw/python/distributed/common/manager.py @@ -32,6 +32,23 @@ class DeviceManager: self.tv_device_list = [] self.watch_device_list = [] self.make_device_list(result_path) + + @staticmethod + def get_device_info_list(result): + device_info_list = [] + tmp_path = os.path.join(result, "temp") + device_info_file_path = os.path.join(tmp_path, + "device_info_file.txt") + + if os.path.exists(device_info_file_path): + with open(device_info_file_path, "r") as file_handle: + lines = file_handle.readlines() + for line in lines: + line = line.replace("\n", "") + line = line.strip() + temp = line.split(",") + device_info_list.append(temp) + return device_info_list def make_device_adapter(self, device_info_list, device_name): device = DeviceShell(self.is_hdc, device_sn=device_info_list[0], @@ -71,22 +88,5 @@ class DeviceManager: setattr(self, device.name, device) return - @staticmethod - def get_device_info_list(result): - device_info_list = [] - tmp_path = os.path.join(result, "temp") - device_info_file_path = os.path.join(tmp_path, - "device_info_file.txt") - - if os.path.exists(device_info_file_path): - with open(device_info_file_path, "r") as file_handle: - lines = file_handle.readlines() - for line in lines: - line = line.replace("\n", "") - line = line.strip() - temp = line.split(",") - device_info_list.append(temp) - return device_info_list - ############################################################################## ############################################################################## diff --git a/aw/python/distributed/distribute/distribute.py b/aw/python/distributed/distribute/distribute.py index 1e68bd5..ad29c82 100755 --- a/aw/python/distributed/distribute/distribute.py +++ b/aw/python/distributed/distribute/distribute.py @@ -85,40 +85,6 @@ class Distribute: self.agent_list = agent_list self.hdc_tools = hdc_tools - def exec_agent(self, device, target_name, result_path, options): - driver = get_current_driver(device, target_name, self.hdc_tools) - if driver is None: - print("Error: driver is None.") - return False - - resource_dir = get_resource_dir(self.suite_dir, device.name) - - self._make_agent_desc_file(device) - device.push_file(os.path.join(self.suite_dir, "agent.desc"), - device.test_path) - device.push_file(os.path.join(resource_dir, target_name), - device.test_path) - - suite_path = os.path.join(self.suite_dir, target_name) - driver.execute(suite_path, result_path, options, background=True) - return self._check_thread(device, target_name) - - def exec_major(self, device, target_name, result_path, options): - driver = get_current_driver(device, target_name, self.hdc_tools) - if driver is None: - print("Error: driver is None.") - return False - - resource_dir = get_resource_dir(self.suite_dir, device.name) - self._make_major_desc_file() - device.push_file(os.path.join(self.suite_dir, "major.desc"), - device.test_path) - device.push_file(os.path.join(resource_dir, target_name), - device.test_path) - - suite_path = os.path.join(self.suite_dir, target_name) - return driver.execute(suite_path, result_path, options, background=False) - @staticmethod def pull_result(device, source_path, result_save_path): _, file_name = os.path.split(source_path) @@ -141,75 +107,6 @@ class Distribute: break return True if checksum < 100 else False - def _make_agent_desc_file(self, device): - agent_ip_list = "" - device_uuid_list = "" - - if self.hdc_tools != "hdc": - if self._query_device_uuid(self.major) != '': - device_uuid_list += self._query_device_uuid(self.major) + "," - - if self._query_device_ip(device) != "": - agent_ip_list += self._query_device_ip(device) + "," - - for agent in self.agent_list: - if self._query_device_uuid(agent): - device_uuid_list += self._query_device_uuid(agent) + "," - - config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", - device_uuid_list) - - config_agent_file = os.path.join(self.suite_dir, "agent.desc") - self._write_device_config(config_info, config_agent_file) - else: - if self._query_device_agent_uuid(self.major): - device_uuid_list += self._query_device_agent_uuid(self.major) + "," - - if self._query_device_hdc_ip(device): - agent_ip_list += self._query_device_hdc_ip(device) + "," - - for agent in self.agent_list: - if self._query_device_agent_uuid(agent): - device_uuid_list += self._query_device_agent_uuid(agent) + "," - - config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", - device_uuid_list) - - config_agent_file = os.path.join(self.suite_dir, "agent.desc") - self._write_device_config(config_info, config_agent_file) - - def _make_major_desc_file(self): - agent_ip_list = "" - device_uuid_list = "" - - if self.hdc_tools != "hdc": - if self._query_device_uuid(self.major) != "NoneTyple": - device_uuid_list += self._query_device_uuid(self.major) + "," - - for agent in self.agent_list: - if self._query_device_ip(agent) != "" and self._query_device_uuid(agent) != "": - agent_ip_list += self._query_device_ip(agent) + "," - device_uuid_list += self._query_device_uuid(agent) + "," - - config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", - device_uuid_list) - - config_major_file = os.path.join(self.suite_dir, "major.desc") - self._write_device_config(config_info, config_major_file) - else: - if self._query_device_major_uuid(self.major): - device_uuid_list += self._query_device_major_uuid(self.major) + "," - - for agent in self.agent_list: - if self._query_device_major_uuid(agent): - agent_ip_list += self._query_device_hdc_ip(agent) + "," - device_uuid_list += self._query_device_major_uuid(agent) + "," - config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", - device_uuid_list) - - config_major_file = os.path.join(self.suite_dir, "major.desc") - self._write_device_config(config_info, config_major_file) - @staticmethod def _query_device_ip(device): output = device.shell_with_output("getprop ro.hardware") @@ -317,7 +214,108 @@ class Distribute: file_desc.write(device_info) os.rename(file_path, final_file) + def exec_agent(self, device, target_name, result_path, options): + driver = get_current_driver(device, target_name, self.hdc_tools) + if driver is None: + print("Error: driver is None.") + return False + resource_dir = get_resource_dir(self.suite_dir, device.name) + + self._make_agent_desc_file(device) + device.push_file(os.path.join(self.suite_dir, "agent.desc"), + device.test_path) + device.push_file(os.path.join(resource_dir, target_name), + device.test_path) + + suite_path = os.path.join(self.suite_dir, target_name) + driver.execute(suite_path, result_path, options, background=True) + return self._check_thread(device, target_name) + + def exec_major(self, device, target_name, result_path, options): + driver = get_current_driver(device, target_name, self.hdc_tools) + if driver is None: + print("Error: driver is None.") + return False + + resource_dir = get_resource_dir(self.suite_dir, device.name) + self._make_major_desc_file() + device.push_file(os.path.join(self.suite_dir, "major.desc"), + device.test_path) + device.push_file(os.path.join(resource_dir, target_name), + device.test_path) + + suite_path = os.path.join(self.suite_dir, target_name) + return driver.execute(suite_path, result_path, options, background=False) + + def _make_agent_desc_file(self, device): + agent_ip_list = "" + device_uuid_list = "" + + if self.hdc_tools != "hdc": + if self._query_device_uuid(self.major) != '': + device_uuid_list += self._query_device_uuid(self.major) + "," + + if self._query_device_ip(device) != "": + agent_ip_list += self._query_device_ip(device) + "," + + for agent in self.agent_list: + if self._query_device_uuid(agent): + device_uuid_list += self._query_device_uuid(agent) + "," + + config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", + device_uuid_list) + + config_agent_file = os.path.join(self.suite_dir, "agent.desc") + self._write_device_config(config_info, config_agent_file) + else: + if self._query_device_agent_uuid(self.major): + device_uuid_list += self._query_device_agent_uuid(self.major) + "," + + if self._query_device_hdc_ip(device): + agent_ip_list += self._query_device_hdc_ip(device) + "," + + for agent in self.agent_list: + if self._query_device_agent_uuid(agent): + device_uuid_list += self._query_device_agent_uuid(agent) + "," + + config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", + device_uuid_list) + + config_agent_file = os.path.join(self.suite_dir, "agent.desc") + self._write_device_config(config_info, config_agent_file) + + def _make_major_desc_file(self): + agent_ip_list = "" + device_uuid_list = "" + + if self.hdc_tools != "hdc": + if self._query_device_uuid(self.major) != "NoneTyple": + device_uuid_list += self._query_device_uuid(self.major) + "," + + for agent in self.agent_list: + if self._query_device_ip(agent) != "" and self._query_device_uuid(agent) != "": + agent_ip_list += self._query_device_ip(agent) + "," + device_uuid_list += self._query_device_uuid(agent) + "," + + config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", + device_uuid_list) + + config_major_file = os.path.join(self.suite_dir, "major.desc") + self._write_device_config(config_info, config_major_file) + else: + if self._query_device_major_uuid(self.major): + device_uuid_list += self._query_device_major_uuid(self.major) + "," + + for agent in self.agent_list: + if self._query_device_major_uuid(agent): + agent_ip_list += self._query_device_hdc_ip(agent) + "," + device_uuid_list += self._query_device_major_uuid(agent) + "," + config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888", + device_uuid_list) + + config_major_file = os.path.join(self.suite_dir, "major.desc") + self._write_device_config(config_info, config_major_file) ############################################################################## ############################################################################## diff --git a/libs/benchmark/report/benchmark_reporter.py b/libs/benchmark/report/benchmark_reporter.py index b90bf25..c95772c 100644 --- a/libs/benchmark/report/benchmark_reporter.py +++ b/libs/benchmark/report/benchmark_reporter.py @@ -29,6 +29,7 @@ from _core.logger import platform_logger __all__ = ["BenchmarkReporter"] LOG = platform_logger("BenchmarkReporter") + @Plugin(type=Plugin.REPORTER, id=TestType.benchmark) class BenchmarkReporter(IReporter): diff --git a/libs/benchmark/report/generate_report.py b/libs/benchmark/report/generate_report.py index 264b51a..39ec4f7 100644 --- a/libs/benchmark/report/generate_report.py +++ b/libs/benchmark/report/generate_report.py @@ -27,6 +27,7 @@ MODES = stat.S_IWUSR | stat.S_IRUSR SETTING_RED_STYLE = """\033[33;31m%s\033[0m""" + def load_json_data(json_file_path): json_data = {} if os.path.isfile(json_file_path): @@ -68,6 +69,7 @@ def get_file_list_by_postfix(path, postfix, filter_jar=""): file_list.append(file_path) return file_list + class BenchmarkReport(object): SUBSYSTEM_SUMMARY = "OHOS_SUBSYSTEM_SUMMARY" ENABLE_LINK = "OHOS_ENABLE_PASSCASE_LINK" @@ -88,24 +90,7 @@ class BenchmarkReport(object): self.sbs_mdl_summary_list = [] self.benchmark_list = [] self._init_default_item() - - def _init_default_item(self): - self.default_item.append("Subsystem") - self.default_item.append("Module") - self.default_item.append("Testsuit") - self.default_item.append("Benchmark") - self.default_item.append("Mode") - self.default_item.append("RunType") - self.default_item.append("TestTargetName") - self.default_item.append("TestTargetMethod") - self.default_item.append("Repetitions") - self.default_item.append("RepetitionIndex") - self.default_item.append("Threads") - self.default_item.append("Iterations") - self.default_item.append("Score") - self.default_item.append("CpuTime") - self.max_index = len(self.default_item) + 1000 - + def generate_benchmark(self, args): if args is None or len(args) <= 2: print(SETTING_RED_STYLE % @@ -129,6 +114,23 @@ class BenchmarkReport(object): self._generate_benchmark_summary_report(os.path.abspath(dest_path)) self._generate_all_benchmark_detail(os.path.abspath(dest_path)) + def _init_default_item(self): + self.default_item.append("Subsystem") + self.default_item.append("Module") + self.default_item.append("Testsuit") + self.default_item.append("Benchmark") + self.default_item.append("Mode") + self.default_item.append("RunType") + self.default_item.append("TestTargetName") + self.default_item.append("TestTargetMethod") + self.default_item.append("Repetitions") + self.default_item.append("RepetitionIndex") + self.default_item.append("Threads") + self.default_item.append("Iterations") + self.default_item.append("Score") + self.default_item.append("CpuTime") + self.max_index = len(self.default_item) + 1000 + def _remove_iterations(self, mdl_summary_list): final_mdl_summary = [] for item_info in mdl_summary_list: diff --git a/libs/fuzzlib/fuzzer_helper.py b/libs/fuzzlib/fuzzer_helper.py index 63dcb16..c1702f0 100644 --- a/libs/fuzzlib/fuzzer_helper.py +++ b/libs/fuzzlib/fuzzer_helper.py @@ -166,7 +166,7 @@ def generate(args): #complie fuzzer project -def make(args, stdout=None): +def make(args, stdout=None): """make fuzzer module.""" color_logger = Colored.get_project_logger() diff --git a/libs/fuzzlib/tools/colored.py b/libs/fuzzlib/tools/colored.py index 865c1f3..9d2b267 100644 --- a/libs/fuzzlib/tools/colored.py +++ b/libs/fuzzlib/tools/colored.py @@ -56,6 +56,26 @@ class Colored(object): self.log_project = log_project self.log_date = time.strftime("%Y%m%d%H%M%S", time.localtime()) + @staticmethod + def get_fuzz_log_dir(): + return Colored.LOG_DIR + + @staticmethod + def log_task_init(project): + Colored.LOG_TO_FILE = True + Colored.LOG_PROJECT = project + Colored.LOG_DATE = time.strftime("%Y%m%d%H%M%S", time.localtime()) + + if not os.path.exists(Colored.LOG_DIR): + os.mkdir(Colored.LOG_DIR) + + project_log_dir = Colored.get_fuzz_project_log_dir() + if not os.path.exists(project_log_dir): + os.mkdir(project_log_dir) + + current_project_log_dir = Colored.get_fuzz_current_project_log_dir() + if not os.path.exists(current_project_log_dir): + os.mkdir(current_project_log_dir) def start_log_file(self): self.is_log_file = True @@ -100,7 +120,7 @@ class Colored(object): Colored.RESET ) else: - msg = '{}{}{}'.format( + msg = '{}{}{}'.format( getattr(Colored, color), s, Colored.RESET @@ -137,25 +157,3 @@ class Colored(object): def simple_print(self, s): self.loghook(s) print(s) - - - @staticmethod - def get_fuzz_log_dir(): - return Colored.LOG_DIR - - @staticmethod - def log_task_init(project): - Colored.LOG_TO_FILE = True - Colored.LOG_PROJECT = project - Colored.LOG_DATE = time.strftime("%Y%m%d%H%M%S", time.localtime()) - - if not os.path.exists(Colored.LOG_DIR): - os.mkdir(Colored.LOG_DIR) - - project_log_dir = Colored.get_fuzz_project_log_dir() - if not os.path.exists(project_log_dir): - os.mkdir(project_log_dir) - - current_project_log_dir = Colored.get_fuzz_current_project_log_dir() - if not os.path.exists(current_project_log_dir): - os.mkdir(current_project_log_dir) diff --git a/libs/fuzzlib/tools/run_result.py b/libs/fuzzlib/tools/run_result.py index f8f5e59..875403b 100644 --- a/libs/fuzzlib/tools/run_result.py +++ b/libs/fuzzlib/tools/run_result.py @@ -56,9 +56,6 @@ class RunResult(): "report_progress": 0 } - def get_log(self): - return "code :{}, msg: {}".format(self.code, self.data) - @staticmethod def filter_log(log_str): ansi_escape = re.compile(r''' @@ -75,6 +72,9 @@ class RunResult(): result = ansi_escape.sub('', log_str) return result + def get_log(self): + return "code :{}, msg: {}".format(self.code, self.data) + def analysis(self, result, outdir): pass diff --git a/libs/fuzzlib/tools/templates.py b/libs/fuzzlib/tools/templates.py index deb5675..5a3cde3 100644 --- a/libs/fuzzlib/tools/templates.py +++ b/libs/fuzzlib/tools/templates.py @@ -186,7 +186,7 @@ REPORT_CSS_TEMPLATE = """ def render_tbody(data): - res = "" + res = "" for row in data: row_line = "" for row_td in row: diff --git a/localCoverage/codeCoverage/mutilProcess_CodeCoverage.py b/localCoverage/codeCoverage/mutilProcess_CodeCoverage.py index 4fde91a..48b5045 100644 --- a/localCoverage/codeCoverage/mutilProcess_CodeCoverage.py +++ b/localCoverage/codeCoverage/mutilProcess_CodeCoverage.py @@ -202,7 +202,7 @@ def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path): def cut_info(subsystem, test_dir): trace_file = os.path.join( CODEPATH, REPORT_PATH, "single_test", - test_dir, f"{subsystem}_output.info" + test_dir, f"{subsystem}_output.info" ) output_name = os.path.join( CODEPATH, REPORT_PATH, "single_test", diff --git a/localCoverage/interfaceCoverage/interfaceCoverage_gcov_lcov.py b/localCoverage/interfaceCoverage/interfaceCoverage_gcov_lcov.py index 2c2e219..1dc7f67 100644 --- a/localCoverage/interfaceCoverage/interfaceCoverage_gcov_lcov.py +++ b/localCoverage/interfaceCoverage/interfaceCoverage_gcov_lcov.py @@ -246,8 +246,7 @@ def get_para_sub_string(content): parentheses_list_left = [] parentheses_list_right = [] - for index in range(len(content)): - char = content[index] + for index, char in enumerate(content): if "<" == char: if 0 == len(parentheses_list_left): start_index = index diff --git a/localCoverage/interfaceCoverage/make_report.py b/localCoverage/interfaceCoverage/make_report.py index 7195842..11edcad 100644 --- a/localCoverage/interfaceCoverage/make_report.py +++ b/localCoverage/interfaceCoverage/make_report.py @@ -16,20 +16,19 @@ # limitations under the License. # - +import os import sys +import stat import datetime from importlib import reload + reload(sys) -import os -import stat FLAGS_WRITE = os.O_WRONLY | os.O_CREAT | os.O_EXCL FLAGS_ADD = os.O_WRONLY | os.O_APPEND | os.O_CREAT MODES = stat.S_IWUSR | stat.S_IRUSR - -html_head = """ +HTML_HEAD = """ @@ -75,12 +74,12 @@ html_head = """ """ -html_body_start = """ +HTML_BODY_START = """ """ -html_body_ended = """ +HTML_BODY_ENDED = """ """ -html_ended = """ +HTML_ENDED = """ """ @@ -97,8 +96,8 @@ def create_html_start(reportpath): if os.path.exists(reportpath): os.remove(reportpath) with os.fdopen(os.open(reportpath, FLAGS_WRITE, MODES), 'w') as report: - report.write(html_head) - report.write(html_body_start) + report.write(HTML_HEAD) + report.write(HTML_BODY_START) except(IOError, ValueError): print("Error for create html start ",) @@ -244,7 +243,7 @@ def create_table_test(reportpath, subsystem_name, datalist, total_count, covered def create_html_ended(reportpath): try: with os.fdopen(os.open(reportpath, FLAGS_ADD, MODES), 'a') as report: - report.write(html_body_ended) - report.write(html_ended) + report.write(HTML_BODY_ENDED) + report.write(HTML_ENDED) except(IOError, ValueError): print("Error for create html end") diff --git a/localCoverage/keyword_registration/keyword_filter.py b/localCoverage/keyword_registration/keyword_filter.py index b5189ad..80ea460 100644 --- a/localCoverage/keyword_registration/keyword_filter.py +++ b/localCoverage/keyword_registration/keyword_filter.py @@ -26,6 +26,7 @@ import platform import linecache import traceback from multiprocessing import Pool + from lxml import html from selectolax.parser import HTMLParser @@ -94,18 +95,6 @@ class KeywordRegistration: self.keyword_file_path = os.path.normcase( os.path.join(os.path.dirname(__file__), "keyword.json")) - def get_keyword_info(self): - """ - 获取报备关键字信息 - """ - try: - with open(self.keyword_file_path, "r") as file: - keyword_dict = json.load(file) - keyword_list = keyword_dict.get("KEYWORD") - return keyword_list - except (FileNotFoundError, AttributeError, FileExistsError): - return [] - @staticmethod def get_coverage_content(file_path): """ @@ -152,6 +141,206 @@ class KeywordRegistration: tag = tag.replace(item, replace_item) return tag + @staticmethod + def get_branch_line_list(keyword_line: int, branch_line_list: list): + """ + 获取大于关键字行号的所有分支行号 + """ + if keyword_line in branch_line_list: + index = branch_line_list.index(keyword_line) + branch_line_list = branch_line_list[index:] + else: + for line in branch_line_list: + if line > keyword_line: + index = branch_line_list.index(line) + branch_line_list = branch_line_list[index:] + break + return branch_line_list + + @staticmethod + def get_keyword_judge_char(keyword, keyword_source_code): + """ + 获取关键字替代字符 + """ + if "&" in keyword: + keyword = keyword.replace("&", "<") + + keyword_index = keyword_source_code.find(keyword) + if keyword_index == -1: + return "" + + try: + keyword_code = keyword_source_code[:keyword_index + len(keyword)] + if " = " in keyword_code: + judge_key = keyword_code.split(" = ")[0].split()[-1] + else: + bracket_index = keyword_code.find("(") + bracket_code = keyword_code[:bracket_index] + judge_key = bracket_code.split()[-1] + return judge_key + except (IndexError, ValueError): + return "" + + @staticmethod + def get_branch_data_by_tag(tag_html: str, symbol_status=None): + """ + 根据前端标签获取分支数据 + """ + if symbol_status: + key = r"#+\-*" + else: + key = r"#+\-" + branch_line_list = re.findall(rf"> ([{key}]) ", tag_html) + + return branch_line_list + + @staticmethod + def get_judge_condition_index(judge_key: str, source_code: str): + """ + 获取判断条件索引 + """ + keyword_index_list = [] + keyword_index_list_append = keyword_index_list.append + + condition_str_list = re.split(rf"\|\||&&", source_code) + for index, code_str in enumerate(condition_str_list): + if judge_key in code_str: + keyword_index_list_append(index) + return keyword_index_list, condition_str_list + + @staticmethod + def update_source_code_tag(html_tag: str): + replace_item_list = ["lineNum", "lineCov", "lineNoCov"] + + for item in replace_item_list: + if item in html_tag: + replace_item = (item + "Update").lower() + html_tag = html_tag.replace(item, replace_item) + + return html_tag + + @staticmethod + def modify_tag_style(tag, rate): + """ + 修改标签样式 + """ + if 75 <= rate < 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed") + tag = tag.replace("coverPerLo", "coverPerMed") + tag = tag.replace("coverNumLo", "coverNumMed") + elif rate >= 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi") + tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi") + tag = tag.replace("coverPerLo", "coverPerHi") + tag = tag.replace("coverNumLo", "coverNumHi") + tag = tag.replace("coverPerMed", "coverPerHi") + tag = tag.replace("coverNumMed", "coverNumHi") + return tag + + @staticmethod + def _branch_replace(branch): + """ + 分支符号替换 + """ + if branch == "#": + branch = "> # <" + elif branch == "+": + branch = "> + <" + elif branch == "*": + branch = "> * <" + else: + branch = "> - <" + return branch + + @staticmethod + def _single_condition_modify_html(branch_html, branch_list): + """ + 单条件修改代码块html + """ + line_item = ' ' + line_feed_index = branch_html.find(line_item) + if line_feed_index == -1: + if "+" in branch_list: + update_branch_tag = branch_html.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + else: + try: + first_branch = branch_list[0] + first_branch = "> " + first_branch + " <" + first_branch_index = branch_html.find(first_branch) + branch_tag = branch_html[:first_branch_index + 5] + update_branch_tag = branch_html[first_branch_index + 5:] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag = branch_tag + update_branch_tag + except ValueError: + return "" + else: + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1] + if "-" not in branch_list and "+" not in branch_list: + del_count = update_branch_tag.count("> # <") + update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1) + else: + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_html[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + if line_feed_index == -1: + branch_tag = branch_tag.replace("> - <", "> <") + branch_tag = branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + else: + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + try: + update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_tag[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + except ValueError: + return "" + + branch_tag = branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + return update_branch_tag + + def get_break_line_tag(self, content, origin_branch_html, branch_line): + """ + 获取分支换行的判断条件源码tag + """ + get_tag = self.get_tag + left_brace_exist = False + line_break = loop_count = 0 + while not left_brace_exist: + if loop_count > 10: + break + line_break += 1 + origin_branch_html = os.path.join(origin_branch_html, "\n") + next_line_tag = get_tag(content, branch_line + line_break) + if "{" in next_line_tag: + left_brace_exist = True + origin_branch_html += next_line_tag + loop_count += 1 + return origin_branch_html + + def get_keyword_info(self): + """ + 获取报备关键字信息 + """ + try: + with open(self.keyword_file_path, "r") as file: + keyword_dict = json.load(file) + keyword_list = keyword_dict.get("KEYWORD") + return keyword_list + except (FileNotFoundError, AttributeError, FileExistsError): + return [] + def get_coverage_lines_by_branch(self, file_path, content=None): """ 获取覆盖率报告中的所有的if分支行号 @@ -308,121 +497,6 @@ class KeywordRegistration: return function_name return function_name - @staticmethod - def get_branch_line_list(keyword_line: int, branch_line_list: list): - """ - 获取大于关键字行号的所有分支行号 - """ - if keyword_line in branch_line_list: - index = branch_line_list.index(keyword_line) - branch_line_list = branch_line_list[index:] - else: - for line in branch_line_list: - if line > keyword_line: - index = branch_line_list.index(line) - branch_line_list = branch_line_list[index:] - break - return branch_line_list - - @staticmethod - def get_keyword_judge_char(keyword, keyword_source_code): - """ - 获取关键字替代字符 - """ - if "&" in keyword: - keyword = keyword.replace("&", "<") - - keyword_index = keyword_source_code.find(keyword) - if keyword_index == -1: - return "" - - try: - keyword_code = keyword_source_code[:keyword_index + len(keyword)] - if " = " in keyword_code: - judge_key = keyword_code.split(" = ")[0].split()[-1] - else: - bracket_index = keyword_code.find("(") - bracket_code = keyword_code[:bracket_index] - judge_key = bracket_code.split()[-1] - return judge_key - except (IndexError, ValueError): - return "" - - @staticmethod - def get_branch_data_by_tag(tag_html: str, symbol_status=None): - """ - 根据前端标签获取分支数据 - """ - if symbol_status: - key = r"#+\-*" - else: - key = r"#+\-" - branch_line_list = re.findall(rf"> ([{key}]) ", tag_html) - - return branch_line_list - - @staticmethod - def get_judge_condition_index(judge_key: str, source_code: str): - """ - 获取判断条件索引 - """ - keyword_index_list = [] - keyword_index_list_append = keyword_index_list.append - - condition_str_list = re.split(rf"\|\||&&", source_code) - for index, code_str in enumerate(condition_str_list): - if judge_key in code_str: - keyword_index_list_append(index) - return keyword_index_list, condition_str_list - - @staticmethod - def update_source_code_tag(html_tag: str): - replace_item_list = ["lineNum", "lineCov", "lineNoCov"] - - for item in replace_item_list: - if item in html_tag: - replace_item = (item + "Update").lower() - html_tag = html_tag.replace(item, replace_item) - - return html_tag - - def get_break_line_tag(self, content, origin_branch_html, branch_line): - """ - 获取分支换行的判断条件源码tag - """ - get_tag = self.get_tag - left_brace_exist = False - line_break = loop_count = 0 - while not left_brace_exist: - if loop_count > 10: - break - line_break += 1 - origin_branch_html = os.path.join(origin_branch_html, "\n") - next_line_tag = get_tag(content, branch_line + line_break) - if "{" in next_line_tag: - left_brace_exist = True - origin_branch_html += next_line_tag - loop_count += 1 - return origin_branch_html - - @staticmethod - def modify_tag_style(tag, rate): - """ - 修改标签样式 - """ - if 75 <= rate < 90: - tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed") - tag = tag.replace("coverPerLo", "coverPerMed") - tag = tag.replace("coverNumLo", "coverNumMed") - elif rate >= 90: - tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi") - tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi") - tag = tag.replace("coverPerLo", "coverPerHi") - tag = tag.replace("coverNumLo", "coverNumHi") - tag = tag.replace("coverPerMed", "coverPerHi") - tag = tag.replace("coverNumMed", "coverNumHi") - return tag - def update_coverage_ratio_tag(self, file_path): """ 修改覆盖率比率数据 @@ -583,187 +657,6 @@ class KeywordRegistration: except (IndexError, TypeError, FileNotFoundError): pass - def _check_if_branch_line(self, judge_key, sub_branch_line_list, - key_line, content, function_name): - """ - 确定if分支行号 - """ - if_branch_line = None - for branch_line in sub_branch_line_list: - if branch_line == key_line: - if_branch_line = key_line - break - # 获取分支行所在函数名 - branch_function_name = self.get_line_funcname( - branch_line, content) - # 关键字范围只在关键字所在函数之内 - branch_line_tag = self.get_tag(content, branch_line) - try: - if "{" not in branch_line_tag: - branch_line_tag = self.get_break_line_tag( - content, branch_line_tag, branch_line) - branch_line_source_code = self.get_source_code( - branch_line_tag) - if function_name == branch_function_name: - if judge_key in branch_line_source_code: - if_branch_line = branch_line - break - else: - break - except (ValueError, KeyError): - pass - - return if_branch_line - - @staticmethod - def _branch_replace(branch): - """ - 分支符号替换 - """ - if branch == "#": - branch = "> # <" - elif branch == "+": - branch = "> + <" - elif branch == "*": - branch = "> * <" - else: - branch = "> - <" - return branch - - @staticmethod - def _single_condition_modify_html(branch_html, branch_list): - """ - 单条件修改代码块html - """ - line_item = ' ' - line_feed_index = branch_html.find(line_item) - if line_feed_index == -1: - if "+" in branch_list: - update_branch_tag = branch_html.replace("> - <", "> <") - update_branch_tag = update_branch_tag.replace("> # <", "> <") - else: - try: - first_branch = branch_list[0] - first_branch = "> " + first_branch + " <" - first_branch_index = branch_html.find(first_branch) - branch_tag = branch_html[:first_branch_index + 5] - update_branch_tag = branch_html[first_branch_index + 5:] - update_branch_tag = update_branch_tag.replace("> - <", "> <") - update_branch_tag = update_branch_tag.replace("> # <", "> <") - update_branch_tag = branch_tag + update_branch_tag - except ValueError: - return "" - else: - line_feed_index = branch_html.find(line_item) - update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1] - if "-" not in branch_list and "+" not in branch_list: - del_count = update_branch_tag.count("> # <") - update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1) - else: - update_branch_tag = update_branch_tag.replace("> - <", "> <") - update_branch_tag = update_branch_tag.replace("> # <", "> <") - branch_tag = branch_html[line_feed_index + len(line_item) + 1:] - line_feed_index = branch_tag.find(line_item) - if line_feed_index == -1: - branch_tag = branch_tag.replace("> - <", "> <") - branch_tag = branch_tag.replace("> # <", "> <") - update_branch_tag += branch_tag - else: - loop_count = 0 - while line_feed_index + 1: - loop_count += 1 - if loop_count > 200: - continue - try: - update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1] - update_branch_tag = update_branch_tag.replace("> - <", "> <") - update_branch_tag = update_branch_tag.replace("> # <", "> <") - branch_tag = branch_tag[line_feed_index + len(line_item) + 1:] - line_feed_index = branch_tag.find(line_item) - except ValueError: - return "" - - branch_tag = branch_tag.replace("> - <", "> <") - update_branch_tag = update_branch_tag.replace("> # <", "> <") - update_branch_tag += branch_tag - return update_branch_tag - - def _multi_condition_modify_html(self, branch_html, branch_length, - condition_str_list, judge_index_list): - """ - 多条件修改代码块html - """ - line_item = ' ' - if branch_length % len(condition_str_list): - line_feed_index = branch_html.find(line_item) - update_branch_tag = branch_html[:line_feed_index] - update_branch_tag = update_branch_tag.replace("> - <", "> <") - branch_html = branch_html[line_feed_index:] - loop_count = 0 - while line_feed_index + 1: - loop_count += 1 - if loop_count > 200: - continue - line_feed_index = branch_html.count(line_item) - if line_feed_index > 1: - try: - line_feed_length = len(line_item) - branch_tag_before = branch_html[:line_feed_length] - branch_html = branch_html[line_feed_length:] - line_feed_index = branch_html.find(line_item) - branch_tag_after = branch_html[:line_feed_index] - branch_tag_after = branch_tag_after.replace("> - <", "> <") - branch_tag = branch_tag_before + branch_tag_after - except ValueError: - return "" - else: - branch_tag = branch_html - branch_tag = branch_tag.replace("> - <", "> <") - # 不再换行,索引为-1 - line_feed_index = -1 - - update_branch_tag += branch_tag - if line_feed_index == -1: - branch_html = "" - else: - branch_html = branch_html[line_feed_index:] - if branch_html != "": - branch_html = branch_html.replace("> - <", "> <") - update_branch_tag += branch_html - else: - branch_list = self.get_branch_data_by_tag(branch_html, True) - update_branch_tag = "" - end_count = -1 - try: - for index in judge_index_list: - branch_data = branch_list[:(index + 1) * 2] - # 要修改的分支数据列表长度 - branch_data_length = len(branch_data) - change_status = False - for count, branch in enumerate(branch_data, 1): - if count <= end_count: - continue - - end_count = count - branch = self._branch_replace(branch) - end_index = branch_html.find(branch) - branch_tag = branch_html[:end_index + 5] - if branch_data_length - count in [0, 1]: - if change_status: - continue - if branch == "> # <": - change_status = True - branch_tag = branch_tag.replace("> # <", "> * <") - elif branch == "> - <": - change_status = True - branch_tag = branch_tag.replace("> - <", "> * <") - update_branch_tag += branch_tag - branch_html = branch_html[end_index + 5:] - except (ValueError, TypeError): - return "" - update_branch_tag += branch_html - return update_branch_tag - def judge_branch_exists(self, file_path): """ 判断报告是否存在分支 @@ -918,6 +811,114 @@ class KeywordRegistration: pool.close() pool.join() + + def _check_if_branch_line(self, judge_key, sub_branch_line_list, + key_line, content, function_name): + """ + 确定if分支行号 + """ + if_branch_line = None + for branch_line in sub_branch_line_list: + if branch_line == key_line: + if_branch_line = key_line + break + # 获取分支行所在函数名 + branch_function_name = self.get_line_funcname( + branch_line, content) + # 关键字范围只在关键字所在函数之内 + branch_line_tag = self.get_tag(content, branch_line) + try: + if "{" not in branch_line_tag: + branch_line_tag = self.get_break_line_tag( + content, branch_line_tag, branch_line) + branch_line_source_code = self.get_source_code( + branch_line_tag) + if function_name == branch_function_name: + if judge_key in branch_line_source_code: + if_branch_line = branch_line + break + else: + break + except (ValueError, KeyError): + pass + + return if_branch_line + + def _multi_condition_modify_html(self, branch_html, branch_length, + condition_str_list, judge_index_list): + """ + 多条件修改代码块html + """ + line_item = ' ' + if branch_length % len(condition_str_list): + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + branch_html = branch_html[line_feed_index:] + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + line_feed_index = branch_html.count(line_item) + if line_feed_index > 1: + try: + line_feed_length = len(line_item) + branch_tag_before = branch_html[:line_feed_length] + branch_html = branch_html[line_feed_length:] + line_feed_index = branch_html.find(line_item) + branch_tag_after = branch_html[:line_feed_index] + branch_tag_after = branch_tag_after.replace("> - <", "> <") + branch_tag = branch_tag_before + branch_tag_after + except ValueError: + return "" + else: + branch_tag = branch_html + branch_tag = branch_tag.replace("> - <", "> <") + # 不再换行,索引为-1 + line_feed_index = -1 + + update_branch_tag += branch_tag + if line_feed_index == -1: + branch_html = "" + else: + branch_html = branch_html[line_feed_index:] + if branch_html != "": + branch_html = branch_html.replace("> - <", "> <") + update_branch_tag += branch_html + else: + branch_list = self.get_branch_data_by_tag(branch_html, True) + update_branch_tag = "" + end_count = -1 + try: + for index in judge_index_list: + branch_data = branch_list[:(index + 1) * 2] + # 要修改的分支数据列表长度 + branch_data_length = len(branch_data) + change_status = False + for count, branch in enumerate(branch_data, 1): + if count <= end_count: + continue + + end_count = count + branch = self._branch_replace(branch) + end_index = branch_html.find(branch) + branch_tag = branch_html[:end_index + 5] + if branch_data_length - count in [0, 1]: + if change_status: + continue + if branch == "> # <": + change_status = True + branch_tag = branch_tag.replace("> # <", "> * <") + elif branch == "> - <": + change_status = True + branch_tag = branch_tag.replace("> - <", "> * <") + update_branch_tag += branch_tag + branch_html = branch_html[end_index + 5:] + except (ValueError, TypeError): + return "" + update_branch_tag += branch_html + return update_branch_tag def main(report_path): diff --git a/src/core/build/build_manager.py b/src/core/build/build_manager.py index 3c8ad7a..7f7b220 100644 --- a/src/core/build/build_manager.py +++ b/src/core/build/build_manager.py @@ -38,6 +38,25 @@ LOG = platform_logger("BuildManager") ############################################################################## class BuildManager(object): + @classmethod + def build_version(cls, project_root_path, product_form): + if BuildTestcases(project_root_path).build_version(product_form): + LOG.info("The version compiled successfully.") + build_result = True + else: + LOG.info("The version compilation failed, please modify.") + build_result = False + return build_result + + @classmethod + def build_gn_file(cls, project_root_path, product_form): + if BuildTestcases(project_root_path).build_gn_file(product_form): + LOG.info("The gn compiled successfully.") + build_result = True + else: + LOG.info("The gn compilation failed, please modify.") + build_result = False + return build_result @classmethod def _make_gn_file(cls, filepath, target_list): @@ -120,7 +139,52 @@ class BuildManager(object): LOG.info("Test case compilation failed, please modify.") return build_result - # 编译入口 + def build_testcases(self, project_root_path, param): + if not os.path.exists(project_root_path): + LOG.error("%s is not exists." % project_root_path) + return False + + LOG.info("--------------------------------------------------") + LOG.info("Building parameter:") + LOG.info("productform = %s" % param.productform) + LOG.info("testtype = %s" % str(param.testtype)) + LOG.info("partname_list = %s" % str(param.partname_list)) + LOG.info("testmodule = %s" % param.testmodule) + LOG.info("testsuit = %s" % param.testsuit) + LOG.info("testcase = %s" % param.testcase) + LOG.info("--------------------------------------------------") + + LOG.info("") + LOG.info("**************************************************") + LOG.info("*************** Start build testcases ************") + LOG.info("**************************************************") + LOG.info("") + + build_xts_result = True + build_result = True + if "partdeps" == param.partdeps: + LOG.info("**********************Start prebuild testcases****************************") + build_deps_files_result = self._compile_deps_files(project_root_path, param) + if build_deps_files_result: + self._compile_part_deps(project_root_path, param) + + if "acts" in param.testtype or "hats" in param.testtype or "hits" in param.testtype: + LOG.info("**********************Start build xts testcases****************************") + build_xts_result = self._compile_xts_test_cases(project_root_path, param) + else: + LOG.info("**********************Start build subsystem testcases****************************") + build_result = self._compile_testcases(project_root_path, param) + + LOG.info("") + LOG.info("**************************************************") + LOG.info("*************** Ended build testcases ************") + LOG.info("**************************************************") + LOG.info("") + + return build_result and build_xts_result + + # 编译入口 + def _compile_testcases(self, project_root_path, para): # 获取所有支持的产品,3.1Release版本为["DAYU","Hi3516DV300","ohos-arm64","ohos-sdk","rk3568"] all_product_list = scan_support_product() @@ -191,70 +255,5 @@ class BuildManager(object): return build_result - @classmethod - def build_version(cls, project_root_path, product_form): - if BuildTestcases(project_root_path).build_version(product_form): - LOG.info("The version compiled successfully.") - build_result = True - else: - LOG.info("The version compilation failed, please modify.") - build_result = False - return build_result - - @classmethod - def build_gn_file(cls, project_root_path, product_form): - if BuildTestcases(project_root_path).build_gn_file(product_form): - LOG.info("The gn compiled successfully.") - build_result = True - else: - LOG.info("The gn compilation failed, please modify.") - build_result = False - return build_result - - def build_testcases(self, project_root_path, param): - if not os.path.exists(project_root_path): - LOG.error("%s is not exists." % project_root_path) - return False - - LOG.info("--------------------------------------------------") - LOG.info("Building parameter:") - LOG.info("productform = %s" % param.productform) - LOG.info("testtype = %s" % str(param.testtype)) - LOG.info("partname_list = %s" % str(param.partname_list)) - LOG.info("testmodule = %s" % param.testmodule) - LOG.info("testsuit = %s" % param.testsuit) - LOG.info("testcase = %s" % param.testcase) - LOG.info("--------------------------------------------------") - - LOG.info("") - LOG.info("**************************************************") - LOG.info("*************** Start build testcases ************") - LOG.info("**************************************************") - LOG.info("") - - build_xts_result = True - build_result = True - if "partdeps" == param.partdeps: - LOG.info("**********************Start prebuild testcases****************************") - build_deps_files_result = self._compile_deps_files(project_root_path, param) - if build_deps_files_result: - self._compile_part_deps(project_root_path, param) - - if "acts" in param.testtype or "hats" in param.testtype or "hits" in param.testtype: - LOG.info("**********************Start build xts testcases****************************") - build_xts_result = self._compile_xts_test_cases(project_root_path, param) - else: - LOG.info("**********************Start build subsystem testcases****************************") - build_result = self._compile_testcases(project_root_path, param) - - LOG.info("") - LOG.info("**************************************************") - LOG.info("*************** Ended build testcases ************") - LOG.info("**************************************************") - LOG.info("") - - return build_result and build_xts_result - - ############################################################################## ############################################################################## diff --git a/src/core/build/build_testcases.py b/src/core/build/build_testcases.py index f67db64..f48101a 100644 --- a/src/core/build/build_testcases.py +++ b/src/core/build/build_testcases.py @@ -110,6 +110,109 @@ class BuildTestcases(object): if os.path.exists(xts_testcase_out_dir): shutil.rmtree(xts_testcase_out_dir) + def build_fuzz_testcases(self, para): + self._delete_testcase_dir(para.productform) + helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py") + command = [sys.executable, helper_path, 'make', + 'make_temp_test', para.productform] + if subprocess.call(command, shell=False) == 0: + build_result = True + else: + build_result = False + self._merge_testcase_dir(para.productform) + return build_result + + # 编译测试用例(编译命令拼接) + def build_testcases(self, productform, target): + command = [] + if self.is_build_example: + command.append("--gn-args") + command.append("build_example=true") + if isinstance(target, list): + for test in target: + command.append("--build-target") + command.append(test + "_test") + elif isinstance(target, str): + for test in target.split(','): + command.append("--build-target") + command.append(test) + + if productform == "rk3568": + pass + else: + command.append("--abi-type") + command.append("generic_generic_arm_64only") + command.append("--device-type") + command.append(get_output_path().split("/")[-1]) + command.append("--build-variant") + command.append("root") + command.append("--ccache") + self._delete_testcase_dir(productform) + build_result = self._execute_build_command(productform, command) + self._merge_testcase_dir(productform) + return build_result + + # 编译XTS测试用例 + def build_xts_testcases(self, para): + self._delete_xts_testcase_dir(para) + xts_build_command = [] + if para.productform == "rk3568": + False + else: + xts_build_test_command = ["--abi-type", "generic_generic_arm_64only", "--device-type", + get_output_path().split("/")[-1], "--build-variant", "root", "--gn-args", + "build_xts=true", "--export-para", "xts_suitename:" + para.testtype[0]] + if len(para.subsystem) > 0: + input_subsystem = ",".join(para.subsystem) + xts_build_test_command.append("--build-target") + xts_build_test_command.append(input_subsystem) + return False + if para.testsuit != "" and len(para.subsystem) == 0: + LOG.error("Please specify subsystem.") + return False + xts_build_command.extend(xts_build_test_command) + xts_build_command.append("--ccache") + build_result = self._execute_build_xts_command(para, xts_build_command) + return build_result + + + def build_deps_files(self, productform): + command = ["--ccache", "--gn-args", "pycache_enable=true", "--gn-args", + "check_deps=true", "--build-only-gn"] + if productform == "rk3568": + pass + else: + command.append("--abi-type") + command.append("generic_generic_arm_64only") + command.append("--device-type") + command.append(get_output_path().split("/")[-1]) + command.append("--build-variant") + command.append("root") + return self._execute_build_deps_files_command(productform, command) + + # 部件间依赖关系预处理,生成part_deps_info.json + def build_part_deps(self, para): + build_part_deps_result = self._execute_build_part_deps_command(para) + return build_part_deps_result + + def build_gn_file(self, productform): + command = [] + if self.is_build_example: + command.append("--gn-args") + command.append("build_example=true") + command.append("--build-only-gn") + command.append("--gn-args") + command.append(BUILD_TARGET_PLATFORM % productform) + return self._execute_build_command(productform, command) + + def build_version(self, productform): + command = [] + command.append("--build-target") + command.append("make_all") + command.append("--gn-args") + command.append(BUILD_TARGET_PLATFORM % productform) + return self._execute_build_command(productform, command) + def _delete_testcase_dir(self, productform): if is_open_source_product(productform): package_out_dir = os.path.join( @@ -299,108 +402,5 @@ class BuildTestcases(object): os.chdir(current_path) return build_result - def build_fuzz_testcases(self, para): - self._delete_testcase_dir(para.productform) - helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py") - command = [sys.executable, helper_path, 'make', - 'make_temp_test', para.productform] - if subprocess.call(command, shell=False) == 0: - build_result = True - else: - build_result = False - self._merge_testcase_dir(para.productform) - return build_result - - # 编译测试用例(编译命令拼接) - def build_testcases(self, productform, target): - command = [] - if self.is_build_example: - command.append("--gn-args") - command.append("build_example=true") - if isinstance(target, list): - for test in target: - command.append("--build-target") - command.append(test + "_test") - elif isinstance(target, str): - for test in target.split(','): - command.append("--build-target") - command.append(test) - - if productform == "rk3568": - pass - else: - command.append("--abi-type") - command.append("generic_generic_arm_64only") - command.append("--device-type") - command.append(get_output_path().split("/")[-1]) - command.append("--build-variant") - command.append("root") - command.append("--ccache") - self._delete_testcase_dir(productform) - build_result = self._execute_build_command(productform, command) - self._merge_testcase_dir(productform) - return build_result - - # 编译XTS测试用例 - def build_xts_testcases(self, para): - self._delete_xts_testcase_dir(para) - xts_build_command = [] - if para.productform == "rk3568": - False - else: - xts_build_test_command = ["--abi-type", "generic_generic_arm_64only", "--device-type", - get_output_path().split("/")[-1], "--build-variant", "root", "--gn-args", - "build_xts=true", "--export-para", "xts_suitename:" + para.testtype[0]] - if len(para.subsystem) > 0: - input_subsystem = ",".join(para.subsystem) - xts_build_test_command.append("--build-target") - xts_build_test_command.append(input_subsystem) - return False - if para.testsuit != "" and len(para.subsystem) == 0: - LOG.error("Please specify subsystem.") - return False - xts_build_command.extend(xts_build_test_command) - xts_build_command.append("--ccache") - build_result = self._execute_build_xts_command(para, xts_build_command) - return build_result - - - def build_deps_files(self, productform): - command = ["--ccache", "--gn-args", "pycache_enable=true", "--gn-args", - "check_deps=true", "--build-only-gn"] - if productform == "rk3568": - pass - else: - command.append("--abi-type") - command.append("generic_generic_arm_64only") - command.append("--device-type") - command.append(get_output_path().split("/")[-1]) - command.append("--build-variant") - command.append("root") - return self._execute_build_deps_files_command(productform, command) - - # 部件间依赖关系预处理,生成part_deps_info.json - def build_part_deps(self, para): - build_part_deps_result = self._execute_build_part_deps_command(para) - return build_part_deps_result - - def build_gn_file(self, productform): - command = [] - if self.is_build_example: - command.append("--gn-args") - command.append("build_example=true") - command.append("--build-only-gn") - command.append("--gn-args") - command.append(BUILD_TARGET_PLATFORM % productform) - return self._execute_build_command(productform, command) - - def build_version(self, productform): - command = [] - command.append("--build-target") - command.append("make_all") - command.append("--gn-args") - command.append(BUILD_TARGET_PLATFORM % productform) - return self._execute_build_command(productform, command) - ############################################################################## ############################################################################## diff --git a/src/core/build/select_targets.py b/src/core/build/select_targets.py index a411232..90cef6f 100755 --- a/src/core/build/select_targets.py +++ b/src/core/build/select_targets.py @@ -82,6 +82,54 @@ class SelectTargets(object): part_path_dic[part_name] = part_path_list return part_path_dic + def get_build_targets(self, productform, typelist, partlist, testmodule): + target_list = [] + + if productform == "" or len(typelist) == 0: + LOG.warning("Error: productform or typelist is empty.") + return [] + + if len(partlist) == 0 and testmodule != "": + LOG.warning( + "The part cannot be empty When the module is not empty.") + return [] + # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 + # partlist和testmodule为空,通过testtype获取部件列表 + if len(partlist) == 0 and testmodule == "": + target_list = self._get_target_list_by_type(productform, typelist) + return target_list + # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 + # partlist不为空,testmodule为空,通过testtype、partlist一起获取部件列表 + if len(partlist) != 0 and testmodule == "": + target_list = self._get_target_list_by_part(productform, typelist, + partlist) + return target_list + # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 + # partlist不为空,testmodule不为空,通过testtype、partlist、testmodule一起获取部件列表 + if len(partlist) != 0 and testmodule != "": + target_list = self._get_target_list_by_module(productform, + typelist, + partlist, + testmodule) + + return target_list + + # 通过infos_for_testfwk.json文件获取所有子部件信息编译目录信息: + # [{“部件名1”:[~/OpenHarmony/out/rk3568/module_list_files/部件名1]}] + # 然后遍历这些目录中的mlf文件,获取其中定义的label,返回label集合 + # 遍历时通过testmodule控制遍历的部件指定模块目录,如果不定义,则遍历子部件下面所有模块目录 + # 遍历时通过partlist控制遍历指定部件目录,如果不定义,则遍历infos_for_testfwk.json文件中定义的所有子部件目录 + def filter_build_targets(self, para): + productform = para.productform + typelist = para.testtype + partlist = para.partname_list + testmodule = para.testmodule + + print("partlist = %s" % str(partlist)) + target_list = self.get_build_targets(productform, typelist, + partlist, testmodule) + return target_list + def _get_target_list_from_path(self, typelist, check_path): target_list = [] if os.path.exists(check_path): @@ -146,54 +194,5 @@ class SelectTargets(object): target_list.extend(temp_list) return target_list - def get_build_targets(self, productform, typelist, partlist, testmodule): - target_list = [] - - if productform == "" or len(typelist) == 0: - LOG.warning("Error: productform or typelist is empty.") - return [] - - if len(partlist) == 0 and testmodule != "": - LOG.warning( - "The part cannot be empty When the module is not empty.") - return [] - # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 - # partlist和testmodule为空,通过testtype获取部件列表 - if len(partlist) == 0 and testmodule == "": - target_list = self._get_target_list_by_type(productform, typelist) - return target_list - # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 - # partlist不为空,testmodule为空,通过testtype、partlist一起获取部件列表 - if len(partlist) != 0 and testmodule == "": - target_list = self._get_target_list_by_part(productform, typelist, - partlist) - return target_list - # productform不为空,typelist(test type[UT,MST,ST,PERF,ALL])不为空 - # partlist不为空,testmodule不为空,通过testtype、partlist、testmodule一起获取部件列表 - if len(partlist) != 0 and testmodule != "": - target_list = self._get_target_list_by_module(productform, - typelist, - partlist, - testmodule) - - return target_list - - # 通过infos_for_testfwk.json文件获取所有子部件信息编译目录信息: - # [{“部件名1”:[~/OpenHarmony/out/rk3568/module_list_files/部件名1]}] - # 然后遍历这些目录中的mlf文件,获取其中定义的label,返回label集合 - # 遍历时通过testmodule控制遍历的部件指定模块目录,如果不定义,则遍历子部件下面所有模块目录 - # 遍历时通过partlist控制遍历指定部件目录,如果不定义,则遍历infos_for_testfwk.json文件中定义的所有子部件目录 - def filter_build_targets(self, para): - productform = para.productform - typelist = para.testtype - partlist = para.partname_list - testmodule = para.testmodule - - print("partlist = %s" % str(partlist)) - target_list = self.get_build_targets(productform, typelist, - partlist, testmodule) - return target_list - - ############################################################################## ############################################################################## diff --git a/src/core/command/console.py b/src/core/command/console.py index 84624f7..68f9a98 100755 --- a/src/core/command/console.py +++ b/src/core/command/console.py @@ -65,47 +65,6 @@ class Console(object): def __init__(self): pass - - def handler_ctrl_c(self, signalnum, frame): - pass - - def handler_ctrl_z(self, signalnum, frame): - pass - - def console(self, args): - """ - Main xDevice console providing user with the interface to interact - """ - EnvironmentManager() - if args is None or len(args) < 2: - self.wizard_dic = show_wizard_mode() - print(self.wizard_dic) - if self._build_version(self.wizard_dic["productform"]): - self._console() - else: - LOG.error("Build version failed, exit test framework.") - else: - self.command_parser(" ".join(args[1:])) - - # 命令执行总入口 - def _console(self): - if platform.system() != 'Windows': - signal.signal(signal.SIGTSTP, self.handler_ctrl_z) # ctrl+x linux - signal.signal(signal.SIGINT, self.handler_ctrl_c) # ctrl+c - - while True: - try: - # 获取用户命令输入 - usr_input = input(">>> ") - if usr_input == "": - continue - # 用户输入命令解析 - self.command_parser(usr_input) - except SystemExit: - LOG.info("Program exit normally!") - return - except (IOError, EOFError, KeyboardInterrupt) as error: - LOG.exception("Input Error: %s" % error) @staticmethod def _parse_combination_param(combination_value): @@ -323,49 +282,6 @@ class Console(object): return options, unparsed, valid_param - def command_parser(self, args): - try: - # 将用户输入的指令按空格拆分成字符串数组 - para_list = args.split() - options, _, valid_param = self.argument_parser(para_list) - if options is None or not valid_param: - LOG.warning("options is None.") - return - - # 根据命令行的命令选择不同的方法执行 - command = options.action - if command == "": - LOG.warning("action is empty.") - return - - if "productform" in self.wizard_dic.keys(): - productform = self.wizard_dic["productform"] - options.productform = productform - else: - productform = options.productform - - if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP): - self._process_command_help(para_list) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW): - self._process_command_show(para_list, productform) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN): - self._process_command_gen(command, options) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN): - # 保存原始控制命令 - options.current_raw_cmd = args - self._process_command_run(command, options) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT): - self._process_command_quit(command) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_LIST): - self._process_command_device(command) - elif command.startswith(ToolCommandType.TOOLCMD_KEY_VERSION): - self._process_command_version(command) - else: - print("The %s command is not supported." % command) - except (AttributeError, IOError, IndexError, ImportError, NameError, - RuntimeError, SystemError, TypeError, ValueError) as exception: - LOG.exception(exception, exc_info=False) - @classmethod def _params_pre_processing(cls, para_list): if len(para_list) <= 1 or ( @@ -477,6 +393,90 @@ class Console(object): product_form) return build_result + def handler_ctrl_c(self, signalnum, frame): + pass + + def handler_ctrl_z(self, signalnum, frame): + pass + + def console(self, args): + """ + Main xDevice console providing user with the interface to interact + """ + EnvironmentManager() + if args is None or len(args) < 2: + self.wizard_dic = show_wizard_mode() + print(self.wizard_dic) + if self._build_version(self.wizard_dic["productform"]): + self._console() + else: + LOG.error("Build version failed, exit test framework.") + else: + self.command_parser(" ".join(args[1:])) + + def command_parser(self, args): + try: + # 将用户输入的指令按空格拆分成字符串数组 + para_list = args.split() + options, _, valid_param = self.argument_parser(para_list) + if options is None or not valid_param: + LOG.warning("options is None.") + return + + # 根据命令行的命令选择不同的方法执行 + command = options.action + if command == "": + LOG.warning("action is empty.") + return + + if "productform" in self.wizard_dic.keys(): + productform = self.wizard_dic["productform"] + options.productform = productform + else: + productform = options.productform + + if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP): + self._process_command_help(para_list) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW): + self._process_command_show(para_list, productform) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN): + self._process_command_gen(command, options) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN): + # 保存原始控制命令 + options.current_raw_cmd = args + self._process_command_run(command, options) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT): + self._process_command_quit(command) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_LIST): + self._process_command_device(command) + elif command.startswith(ToolCommandType.TOOLCMD_KEY_VERSION): + self._process_command_version(command) + else: + print("The %s command is not supported." % command) + except (AttributeError, IOError, IndexError, ImportError, NameError, + RuntimeError, SystemError, TypeError, ValueError) as exception: + LOG.exception(exception, exc_info=False) + + # 命令执行总入口 + def _console(self): + if platform.system() != 'Windows': + signal.signal(signal.SIGTSTP, self.handler_ctrl_z) # ctrl+x linux + signal.signal(signal.SIGINT, self.handler_ctrl_c) # ctrl+c + + while True: + try: + # 获取用户命令输入 + usr_input = input(">>> ") + if usr_input == "": + continue + # 用户输入命令解析 + self.command_parser(usr_input) + except SystemExit: + LOG.info("Program exit normally!") + return + except (IOError, EOFError, KeyboardInterrupt) as error: + LOG.exception("Input Error: %s" % error) + @dataclass class ConfigConst(object): diff --git a/src/core/command/display.py b/src/core/command/display.py index b7f60fe..b8de6ef 100644 --- a/src/core/command/display.py +++ b/src/core/command/display.py @@ -201,7 +201,6 @@ def select_user_input(data_list): return select_item_value, select_item_index - # 选择productform def select_productform(): select_value = "phone" diff --git a/src/core/command/gen.py b/src/core/command/gen.py index 9a295e9..fc9ed92 100644 --- a/src/core/command/gen.py +++ b/src/core/command/gen.py @@ -28,7 +28,22 @@ MODES = stat.S_IWUSR | stat.S_IRUSR LOG = platform_logger("Gen") + class Gen(object): + @classmethod + def fuzz_dir_generation(cls, options): + helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py") + fuzz_path = os.path.join(sys.source_code_root_path, options.dirpath) + LOG.info("fuzz_path = %s" % fuzz_path) + if not os.path.exists(fuzz_path): + os.makedirs(fuzz_path) + LOG.info("make folder %s" % fuzz_path) + + command = [sys.executable, helper_path, 'generate', + options.fuzzername, fuzz_path] + LOG.info("command %s" % command) + subprocess.call(command, shell=False) + def process_command_gen(self, options): if (len(options.testtype) != 1) or (options.dirpath == "") or \ (options.fuzzername == ""): @@ -52,17 +67,3 @@ class Gen(object): for target in fuzzer_list: if target: gn_file.write("\"%s\",\n" % target) - - @classmethod - def fuzz_dir_generation(cls, options): - helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py") - fuzz_path = os.path.join(sys.source_code_root_path, options.dirpath) - LOG.info("fuzz_path = %s" % fuzz_path) - if not os.path.exists(fuzz_path): - os.makedirs(fuzz_path) - LOG.info("make folder %s" % fuzz_path) - - command = [sys.executable, helper_path, 'generate', - options.fuzzername, fuzz_path] - LOG.info("command %s" % command) - subprocess.call(command, shell=False) \ No newline at end of file diff --git a/src/core/command/run.py b/src/core/command/run.py index ee89702..10bb2a5 100644 --- a/src/core/command/run.py +++ b/src/core/command/run.py @@ -54,279 +54,6 @@ class Run(object): def get_history(self): return self.history_cmd_list - def process_command_run(self, command, options): - current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" ")))) - if options.coverage and platform.system() != "Windows": - if not options.pullgcda: - push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py") - if os.path.exists(push_cov_path): - if str(options.testpart) == "[]" and str(options.subsystem) == "[]": - LOG.info("No subsystem or part input. Not push coverage so.") - elif str(options.testpart) != "[]" and str(options.subsystem) != "[]": - LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.") - else: - if str(options.testpart) != "[]": - param = str(options.testpart) - subprocess.run("python3 {} {} {}".format( - push_cov_path, "testpart", param), shell=True) - else: - param = str(options.subsystem) - subprocess.run("python3 {} {} {}".format( - push_cov_path, "subsystem", param), shell=True) - else: - print(f"{push_cov_path} not exists.") - - init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py") - if os.path.exists(init_gcov_path): - subprocess.run("python3 %s command_str=%s" % ( - init_gcov_path, current_raw_cmd), shell=True) - else: - print(f"{init_gcov_path} not exists.") - - para = Parameter() - test_type_list = para.get_testtype_list(options.testtype) - if len(test_type_list) == 0: - LOG.error("The testtype parameter is incorrect.") - return - options.testtype = test_type_list - - parser = ParsePartsConfig(options.productform) - partname_list = parser.get_part_list( - options.subsystem, - options.testpart) - options.partname_list = partname_list - options.coverage_outpath = self.get_coverage_outpath(options) - - LOG.info("") - LOG.info("------------------------------------") - LOG.info("Input parameter:") - LOG.info("productform = %s" % options.productform) - LOG.info("testtype = %s" % str(options.testtype)) - LOG.info("subsystem = %s" % str(options.subsystem)) - LOG.info("testpart = %s" % str(options.testpart)) - LOG.info("testmodule = %s" % options.testmodule) - LOG.info("testsuit = %s" % options.testsuit) - LOG.info("testcase = %s" % options.testcase) - LOG.info("testlevel = %s" % options.testlevel) - LOG.info("testargs = %s" % options.testargs) - LOG.info("repeat = %s" % options.repeat) - LOG.info("retry = %s" % options.retry) - LOG.info("historylist = %s" % options.historylist) - LOG.info("runhistory = %s" % options.runhistory) - LOG.info("partname_list = %s" % str(options.partname_list)) - LOG.info("partdeps = %s" % options.partdeps) - LOG.info("------------------------------------") - LOG.info("") - - if not para.check_run_parameter(options): - LOG.error("Input parameter is incorrect.") - return - - current_time = datetime.datetime.now() - #记录命令运行历史 - need_record_history = False - cmd_record = { - "time" : str(current_time), - "raw_cmd" : options.current_raw_cmd, - "result" : "unknown", - "command": command, - "options": options - } - if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ - or "--retry" in options.current_raw_cmd): - need_record_history = True - - #打印历史记录 - if options.historylist: - print("The latest command history is: %d" % len(self.history_cmd_list)) - for index in range(0, len(self.history_cmd_list)): - cmd_record = self.history_cmd_list[index] - print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], - cmd_record["raw_cmd"], cmd_record["result"])) - return - #重新运行历史里的一条命令 - if options.runhistory > 0: - #如果记录大于10则认为非法 - if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): - print("input history command[%d] out of range:", options.runhistory) - return - cmd_record = self.history_cmd_list[options.runhistory - 1] - print("run history command:", cmd_record["raw_cmd"]) - need_record_history = False - command = cmd_record["command"] - options = cmd_record["options"] - - if options.retry: - if len(self.history_cmd_list) <= 0: - LOG.info("No history command exsit") - return - history_cmd = self.history_cmd_list[-1] - command = history_cmd["command"] - options = history_cmd["options"] - from xdevice import Variables - latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") - tree = ElementTree.parse(latest_report_path) - root = tree.getroot() - has_failed_case = 0 - test_targets = {} - fail_list = [] - for child in root: - print(child.tag, ":", child.attrib) - for grand in child: - print(grand.tag, ":", grand.attrib) - for sub_child in grand: - if sub_child.tag == 'failure': - fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] - fail_list.append(fail_case) - has_failed_case += 1 - break - test_targets["class"] = fail_list - setattr(options, "testargs", test_targets) - print("retry option:", options) - if has_failed_case > 0: - if not self._build_test_cases(options): - LOG.error("Build test cases failed.") - return - scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, - plugin_id=SchedulerType.SCHEDULER)[0] - scheduler.exec_command(command, options) - else: - LOG.info("No testcase to retry") - return - - if not self._build_test_cases(options): - LOG.error("Build test cases failed.") - return - - if "partdeps" == options.partdeps: - self.get_part_deps_list(options.productform, options.testpart) - options.testcases_path = self.get_external_deps_out_path(options.productform) - LOG.info("partdeps = %s" % options.partdeps) - - if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype: - test_dict = self.get_xts_test_dict(options) - options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype) - options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype) - else: - test_dict = self.get_test_dict(options) - - if not self._check_test_dictionary(test_dict): - LOG.error("The test file list is empty.") - return - if options.coverage and platform.system() != "Windows": - coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage") - if os.path.exists(coverage_path): - coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True) - coverage_process.communicate() - - if ("distributedtest" in options.testtype and - len(options.testtype) == 1): - from core.command.distribute_utils import get_test_case - from core.command.distribute_utils \ - import check_ditributetest_environment - from core.command.distribute_utils import make_device_info_file - from core.command.distribute_utils import make_reports - - local_time = time.localtime() - create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) - start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) - - if not check_ditributetest_environment(): - return - - output_test = get_test_case(test_dict["CXX"]) - if not output_test: - return - - result_rootpath = os.path.join(sys.framework_root_dir, - "reports", - create_time) - - log_path = os.path.join(result_rootpath, "log") - tmp_path = os.path.join(result_rootpath, "temp") - os.makedirs(log_path, exist_ok=True) - os.makedirs(tmp_path, exist_ok=True) - - Scheduler.start_task_log(log_path) - make_device_info_file(tmp_path) - - for case in output_test: - agent_target_name = case["agent_target_name"] - major_target_name = case["major_target_name"] - manager = DbinderTest(result_rootpath, case["suits_dir"]) - manager.setUp() - manager.test_distribute(major_target_name, agent_target_name, options) - manager.tearDown() - - make_reports(result_rootpath, start_time) - Scheduler.stop_task_logcat() - else: - options.testdict = test_dict - options.target_outpath = self.get_target_out_path( - options.productform) - - scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, - plugin_id=SchedulerType.SCHEDULER)[0] - if scheduler is None: - LOG.error("Can not find the scheduler plugin.") - else: - options.testcases_path = self.get_tests_out_path(options.productform) - options.resource_path = os.path.abspath(os.path.join( - sys.framework_root_dir, "..", "resource")) - if is_lite_product(options.productform, - sys.source_code_root_path): - if options.productform.find("wifiiot") != -1: - scheduler.update_test_type_in_source(".bin", - DeviceTestType.ctest_lite) - scheduler.update_ext_type_in_source("BIN", - DeviceTestType.ctest_lite) - else: - print("productform is not wifiiot") - scheduler.exec_command(command, options) - if need_record_history: - #读文件获取运行结果 - from xdevice import Variables - latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") - with open(latest_report_path) as report_file: - for report_line in report_file: - if "testsuites name=\"summary_report\"" in report_line: - result = report_line.replace("\n", "") - result = result.replace("", "") - cmd_record["result"] = result - break - if len(self.history_cmd_list) >= 10: - del self.history_cmd_list[0] - self.history_cmd_list.append(cmd_record) - - if "fuzztest" == options.testtype[0] and options.coverage is False: - report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0] - latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus") - if os.path.exists(latest_corpus_path): - shutil.rmtree(latest_corpus_path) - shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path) - - if options.coverage and platform.system() != "Windows": - pull_service_gcov_path = os.path.join( - sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py") - if os.path.exists(pull_service_gcov_path): - subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True) - else: - print(f"{pull_service_gcov_path} not exists.") - - if not options.pullgcda: - cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py") - testpart = ",".join(list(map(str, options.partname_list))) - if os.path.exists(cov_main_file_path): - subprocess.run("python3 %s testpart=%s" % ( - cov_main_file_path, testpart), shell=True) - else: - print(f"{cov_main_file_path} not exists.") - return - - ############################################################## - ############################################################## - @classmethod def get_target_out_path(cls, product_form): target_out_path = UserConfigManager().get_test_cases_dir() @@ -425,6 +152,275 @@ class Run(object): external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart) return external_deps_path_list + def process_command_run(self, command, options): + current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" ")))) + if options.coverage and platform.system() != "Windows": + if not options.pullgcda: + push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py") + if os.path.exists(push_cov_path): + if str(options.testpart) == "[]" and str(options.subsystem) == "[]": + LOG.info("No subsystem or part input. Not push coverage so.") + elif str(options.testpart) != "[]" and str(options.subsystem) != "[]": + LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.") + else: + if str(options.testpart) != "[]": + param = str(options.testpart) + subprocess.run("python3 {} {} {}".format( + push_cov_path, "testpart", param), shell=True) + else: + param = str(options.subsystem) + subprocess.run("python3 {} {} {}".format( + push_cov_path, "subsystem", param), shell=True) + else: + print(f"{push_cov_path} not exists.") + + init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py") + if os.path.exists(init_gcov_path): + subprocess.run("python3 %s command_str=%s" % ( + init_gcov_path, current_raw_cmd), shell=True) + else: + print(f"{init_gcov_path} not exists.") + + para = Parameter() + test_type_list = para.get_testtype_list(options.testtype) + if len(test_type_list) == 0: + LOG.error("The testtype parameter is incorrect.") + return + options.testtype = test_type_list + + parser = ParsePartsConfig(options.productform) + partname_list = parser.get_part_list( + options.subsystem, + options.testpart) + options.partname_list = partname_list + options.coverage_outpath = self.get_coverage_outpath(options) + + LOG.info("") + LOG.info("------------------------------------") + LOG.info("Input parameter:") + LOG.info("productform = %s" % options.productform) + LOG.info("testtype = %s" % str(options.testtype)) + LOG.info("subsystem = %s" % str(options.subsystem)) + LOG.info("testpart = %s" % str(options.testpart)) + LOG.info("testmodule = %s" % options.testmodule) + LOG.info("testsuit = %s" % options.testsuit) + LOG.info("testcase = %s" % options.testcase) + LOG.info("testlevel = %s" % options.testlevel) + LOG.info("testargs = %s" % options.testargs) + LOG.info("repeat = %s" % options.repeat) + LOG.info("retry = %s" % options.retry) + LOG.info("historylist = %s" % options.historylist) + LOG.info("runhistory = %s" % options.runhistory) + LOG.info("partname_list = %s" % str(options.partname_list)) + LOG.info("partdeps = %s" % options.partdeps) + LOG.info("------------------------------------") + LOG.info("") + + if not para.check_run_parameter(options): + LOG.error("Input parameter is incorrect.") + return + + current_time = datetime.datetime.now() + #记录命令运行历史 + need_record_history = False + cmd_record = { + "time" : str(current_time), + "raw_cmd" : options.current_raw_cmd, + "result" : "unknown", + "command": command, + "options": options + } + if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ + or "--retry" in options.current_raw_cmd): + need_record_history = True + + #打印历史记录 + if options.historylist: + print("The latest command history is: %d" % len(self.history_cmd_list)) + for index, cmd_record in enumerate(self.history_cmd_list): + print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], + cmd_record["raw_cmd"], cmd_record["result"])) + return + #重新运行历史里的一条命令 + if options.runhistory > 0: + #如果记录大于10则认为非法 + if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): + print("input history command[%d] out of range:", options.runhistory) + return + cmd_record = self.history_cmd_list[options.runhistory - 1] + print("run history command:", cmd_record["raw_cmd"]) + need_record_history = False + command = cmd_record["command"] + options = cmd_record["options"] + + if options.retry: + if len(self.history_cmd_list) <= 0: + LOG.info("No history command exsit") + return + history_cmd = self.history_cmd_list[-1] + command = history_cmd["command"] + options = history_cmd["options"] + from xdevice import Variables + latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") + tree = ElementTree.parse(latest_report_path) + root = tree.getroot() + has_failed_case = 0 + test_targets = {} + fail_list = [] + for child in root: + print(child.tag, ":", child.attrib) + for grand in child: + print(grand.tag, ":", grand.attrib) + for sub_child in grand: + if sub_child.tag == 'failure': + fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] + fail_list.append(fail_case) + has_failed_case += 1 + break + test_targets["class"] = fail_list + setattr(options, "testargs", test_targets) + print("retry option:", options) + if has_failed_case > 0: + if not self._build_test_cases(options): + LOG.error("Build test cases failed.") + return + scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, + plugin_id=SchedulerType.SCHEDULER)[0] + scheduler.exec_command(command, options) + else: + LOG.info("No testcase to retry") + return + + if not self._build_test_cases(options): + LOG.error("Build test cases failed.") + return + + if "partdeps" == options.partdeps: + self.get_part_deps_list(options.productform, options.testpart) + options.testcases_path = self.get_external_deps_out_path(options.productform) + LOG.info("partdeps = %s" % options.partdeps) + + if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype: + test_dict = self.get_xts_test_dict(options) + options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype) + options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype) + else: + test_dict = self.get_test_dict(options) + + if not self._check_test_dictionary(test_dict): + LOG.error("The test file list is empty.") + return + if options.coverage and platform.system() != "Windows": + coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage") + if os.path.exists(coverage_path): + coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True) + coverage_process.communicate() + + if ("distributedtest" in options.testtype and + len(options.testtype) == 1): + from core.command.distribute_utils import get_test_case + from core.command.distribute_utils \ + import check_ditributetest_environment + from core.command.distribute_utils import make_device_info_file + from core.command.distribute_utils import make_reports + + local_time = time.localtime() + create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) + start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) + + if not check_ditributetest_environment(): + return + + output_test = get_test_case(test_dict.get("CXX", None) + if not output_test: + return + + result_rootpath = os.path.join(sys.framework_root_dir, + "reports", + create_time) + + log_path = os.path.join(result_rootpath, "log") + tmp_path = os.path.join(result_rootpath, "temp") + os.makedirs(log_path, exist_ok=True) + os.makedirs(tmp_path, exist_ok=True) + + Scheduler.start_task_log(log_path) + make_device_info_file(tmp_path) + + for case in output_test: + agent_target_name = case["agent_target_name"] + major_target_name = case["major_target_name"] + manager = DbinderTest(result_rootpath, case["suits_dir"]) + manager.setUp() + manager.test_distribute(major_target_name, agent_target_name, options) + manager.tearDown() + + make_reports(result_rootpath, start_time) + Scheduler.stop_task_logcat() + else: + options.testdict = test_dict + options.target_outpath = self.get_target_out_path( + options.productform) + + scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, + plugin_id=SchedulerType.SCHEDULER)[0] + if scheduler is None: + LOG.error("Can not find the scheduler plugin.") + else: + options.testcases_path = self.get_tests_out_path(options.productform) + options.resource_path = os.path.abspath(os.path.join( + sys.framework_root_dir, "..", "resource")) + if is_lite_product(options.productform, + sys.source_code_root_path): + if options.productform.find("wifiiot") != -1: + scheduler.update_test_type_in_source(".bin", + DeviceTestType.ctest_lite) + scheduler.update_ext_type_in_source("BIN", + DeviceTestType.ctest_lite) + else: + print("productform is not wifiiot") + scheduler.exec_command(command, options) + if need_record_history: + #读文件获取运行结果 + from xdevice import Variables + latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") + with open(latest_report_path) as report_file: + for report_line in report_file: + if "testsuites name=\"summary_report\"" in report_line: + result = report_line.replace("\n", "") + result = result.replace("", "") + cmd_record["result"] = result + break + if len(self.history_cmd_list) >= 10: + del self.history_cmd_list[0] + self.history_cmd_list.append(cmd_record) + + if "fuzztest" == options.testtype[0] and options.coverage is False: + report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0] + latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus") + if os.path.exists(latest_corpus_path): + shutil.rmtree(latest_corpus_path) + shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path) + + if options.coverage and platform.system() != "Windows": + pull_service_gcov_path = os.path.join( + sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py") + if os.path.exists(pull_service_gcov_path): + subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True) + else: + print(f"{pull_service_gcov_path} not exists.") + + if not options.pullgcda: + cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py") + testpart = ",".join(list(map(str, options.partname_list))) + if os.path.exists(cov_main_file_path): + subprocess.run("python3 %s testpart=%s" % ( + cov_main_file_path, testpart), shell=True) + else: + print(f"{cov_main_file_path} not exists.") + return + def get_xts_test_dict(self, options): # 获取XTS测试用例编译结果路径 xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype) @@ -443,5 +439,3 @@ class Run(object): test_dict = TestCaseManager().get_test_files(test_case_path, options) return test_dict - - diff --git a/src/core/config/config_manager.py b/src/core/config/config_manager.py index 492a717..47690ac 100755 --- a/src/core/config/config_manager.py +++ b/src/core/config/config_manager.py @@ -175,6 +175,17 @@ class UserConfigManager(object): self.filepath = os.path.abspath( os.path.join(CONFIG_PATH, config_file)) + @classmethod + def content_strip(cls, content): + return content.strip() + + @classmethod + def _verify_duplicate(cls, items): + if len(set(items)) != len(items): + LOG.warning("find duplicate sn config, configuration incorrect") + return False + return True + def get_user_config_list(self, tag_name): data_dic = {} try: @@ -189,25 +200,6 @@ class UserConfigManager(object): LOG.error(("Parse %s fail!" % self.filepath) + xml_exception.args) return data_dic - @classmethod - def content_strip(cls, content): - return content.strip() - - @classmethod - def _verify_duplicate(cls, items): - if len(set(items)) != len(items): - LOG.warning("find duplicate sn config, configuration incorrect") - return False - return True - - def _handle_str(self, content): - config_list = map(self.content_strip, content.split(';')) - config_list = [item for item in config_list if item] - if config_list: - if not self._verify_duplicate(config_list): - return [] - return config_list - def get_sn_list(self): sn_select_list = [] try: @@ -280,6 +272,14 @@ class UserConfigManager(object): if testcase_path != "": testcase_path = os.path.abspath(testcase_path) return testcase_path + + def _handle_str(self, content): + config_list = map(self.content_strip, content.split(';')) + config_list = [item for item in config_list if item] + if config_list: + if not self._verify_duplicate(config_list): + return [] + return config_list class BuildConfigManager(object): diff --git a/src/core/config/resource_manager.py b/src/core/config/resource_manager.py index d5562ea..e0b469d 100755 --- a/src/core/config/resource_manager.py +++ b/src/core/config/resource_manager.py @@ -33,63 +33,6 @@ class ResourceManager(object): def __init__(self): pass - def get_resource_data_dic(self, testsuit_filepath): - resource_dir = "" - data_dic = {} - - target_name, _ = self._get_file_name_extension(testsuit_filepath) - xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) - if not os.path.exists(xml_filepath): - return data_dic, resource_dir - - data_dic = self.get_resource_data(xml_filepath, target_name) - resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) - return data_dic, resource_dir - - def get_resource_data(self, xml_filepath, target_name): - data_dic = {} - if os.path.exists(xml_filepath): - data_dic = self._parse_resource_test_xml_file( - xml_filepath, target_name) - return data_dic - - def _parse_resource_test_xml_file(self, filepath, targetname): - data_dic = {} - - node = self.find_node_by_target(filepath, targetname) - if node: - target_attrib_list = [] - target_attrib_list.append(node.attrib) - environment_data_list = [] - env_node = node.find("environment") - if env_node: - environment_data_list.append(env_node.attrib) - for element in env_node.findall("device"): - environment_data_list.append(element.attrib) - for option_element in element.findall("option"): - environment_data_list.append(option_element.attrib) - - preparer_data_list = [] - pre_node = node.find("preparer") - if pre_node: - preparer_data_list.append(pre_node.attrib) - for element in pre_node.findall("option"): - preparer_data_list.append(element.attrib) - - cleaner_data_list = [] - clr_node = node.find("cleaner") - if clr_node: - cleaner_data_list.append(clr_node.attrib) - for element in clr_node.findall("option"): - cleaner_data_list.append(element.attrib) - - data_dic["nodeattrib"] = target_attrib_list - data_dic["environment"] = environment_data_list - data_dic["preparer"] = preparer_data_list - data_dic["cleaner"] = cleaner_data_list - - return data_dic - @staticmethod def find_node_by_target(file_path, targe_tname): node = None @@ -109,9 +52,6 @@ class ResourceManager(object): xml_exception.args) return node - ########################################################################## - ########################################################################## - @classmethod def _get_file_name_extension(cls, filepath): _, fullname = os.path.split(filepath) @@ -127,64 +67,7 @@ class ResourceManager(object): if len(dir_name_list) > 1: dir_name = dir_name_list[-1] return dir_name - - def process_resource_file(self, resource_dir, preparer_list, device): - for item in preparer_list: - if "name" not in item.keys(): - continue - - if item["name"] == "push": - push_value = item["value"] - find_key = "->" - pos = push_value.find(find_key) - src = os.path.join(resource_dir, push_value[0:pos].strip()) - dst = push_value[pos + len(find_key):len(push_value)].strip() - src = src.replace("/", os.sep) - dir_name = self.get_dir_name(src) - if dir_name != "": - dst = dst.rstrip("/") + "/" + dir_name - device.execute_shell_command("mkdir -p %s" % dst) - device.push_file(src, dst) - elif item["name"] == "pull": - push_value = item["value"] - find_key = "->" - pos = push_value.find(find_key) - src = os.path.join(resource_dir, push_value[0:pos].strip()) - dst = push_value[pos + len(find_key):len(push_value)].strip() - device.pull_file(src, dst) - elif item["name"] == "shell": - command = item["value"].strip() - device.execute_shell_command(command) - else: - command = item["name"] + " " + item["value"] - command = command.strip() - device.connector_command(command) - - def lite_process_resource_file(self, resource_dir, preparer_list): - for item in preparer_list: - if "name" not in item.keys(): - continue - - if item["name"] == "push": - copy_value = item["value"] - find_key = "->" - pos = copy_value.find(find_key) - src = os.path.join(resource_dir, copy_value[0:pos].strip()) - dst = copy_value[pos + len(find_key):len(copy_value)].strip() - shutil.copy(src, dst) - - elif item["name"] == "pull": - copy_value = item["value"] - find_key = "->" - pos = copy_value.find(find_key) - src = os.path.join(resource_dir, copy_value[0:pos].strip()) - dst = copy_value[pos + len(find_key):len(copy_value)].strip() - shutil.copyfile(dst, src) - else: - command = item["name"] + " " + item["value"] - command = command.strip() - self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) - + @classmethod def get_env_data(cls, environment_list): env_data_dic = {} @@ -254,6 +137,124 @@ class ResourceManager(object): curr_timeout = node_item_dic["timeout"] return curr_timeout + def get_resource_data_dic(self, testsuit_filepath): + resource_dir = "" + data_dic = {} + + target_name, _ = self._get_file_name_extension(testsuit_filepath) + xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) + if not os.path.exists(xml_filepath): + return data_dic, resource_dir + + data_dic = self.get_resource_data(xml_filepath, target_name) + resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) + return data_dic, resource_dir + + def get_resource_data(self, xml_filepath, target_name): + data_dic = {} + if os.path.exists(xml_filepath): + data_dic = self._parse_resource_test_xml_file( + xml_filepath, target_name) + return data_dic + + def _parse_resource_test_xml_file(self, filepath, targetname): + data_dic = {} + + node = self.find_node_by_target(filepath, targetname) + if node: + target_attrib_list = [] + target_attrib_list.append(node.attrib) + environment_data_list = [] + env_node = node.find("environment") + if env_node: + environment_data_list.append(env_node.attrib) + for element in env_node.findall("device"): + environment_data_list.append(element.attrib) + for option_element in element.findall("option"): + environment_data_list.append(option_element.attrib) + + preparer_data_list = [] + pre_node = node.find("preparer") + if pre_node: + preparer_data_list.append(pre_node.attrib) + for element in pre_node.findall("option"): + preparer_data_list.append(element.attrib) + + cleaner_data_list = [] + clr_node = node.find("cleaner") + if clr_node: + cleaner_data_list.append(clr_node.attrib) + for element in clr_node.findall("option"): + cleaner_data_list.append(element.attrib) + + data_dic["nodeattrib"] = target_attrib_list + data_dic["environment"] = environment_data_list + data_dic["preparer"] = preparer_data_list + data_dic["cleaner"] = cleaner_data_list + + return data_dic + + + ########################################################################## + ########################################################################## + + def process_resource_file(self, resource_dir, preparer_list, device): + for item in preparer_list: + if "name" not in item.keys(): + continue + + if item["name"] == "push": + push_value = item["value"] + find_key = "->" + pos = push_value.find(find_key) + src = os.path.join(resource_dir, push_value[0:pos].strip()) + dst = push_value[pos + len(find_key):len(push_value)].strip() + src = src.replace("/", os.sep) + dir_name = self.get_dir_name(src) + if dir_name != "": + dst = dst.rstrip("/") + "/" + dir_name + device.execute_shell_command("mkdir -p %s" % dst) + device.push_file(src, dst) + elif item["name"] == "pull": + push_value = item["value"] + find_key = "->" + pos = push_value.find(find_key) + src = os.path.join(resource_dir, push_value[0:pos].strip()) + dst = push_value[pos + len(find_key):len(push_value)].strip() + device.pull_file(src, dst) + elif item["name"] == "shell": + command = item["value"].strip() + device.execute_shell_command(command) + else: + command = item["name"] + " " + item["value"] + command = command.strip() + device.connector_command(command) + + def lite_process_resource_file(self, resource_dir, preparer_list): + for item in preparer_list: + if "name" not in item.keys(): + continue + + if item["name"] == "push": + copy_value = item["value"] + find_key = "->" + pos = copy_value.find(find_key) + src = os.path.join(resource_dir, copy_value[0:pos].strip()) + dst = copy_value[pos + len(find_key):len(copy_value)].strip() + shutil.copy(src, dst) + + elif item["name"] == "pull": + copy_value = item["value"] + find_key = "->" + pos = copy_value.find(find_key) + src = os.path.join(resource_dir, copy_value[0:pos].strip()) + dst = copy_value[pos + len(find_key):len(copy_value)].strip() + shutil.copyfile(dst, src) + else: + command = item["name"] + " " + item["value"] + command = command.strip() + self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) + def get_environment_data(self, data_dic): env_data_dic = {} if "environment" in data_dic.keys(): diff --git a/src/core/driver/drivers.py b/src/core/driver/drivers.py index c27b902..44ecc97 100644 --- a/src/core/driver/drivers.py +++ b/src/core/driver/drivers.py @@ -539,6 +539,31 @@ class CppTestDriver(IDriver): config = None result = "" + @staticmethod + def _alter_init(name): + with open(name, "rb") as f: + lines = f.read() + str_content = lines.decode("utf-8") + + pattern_sharp = '^\s*#.*$(\n|\r\n)' + pattern_star = '/\*.*?\*/(\n|\r\n)+' + pattern_xml = '(\n|\r\n)+' + + if re.match(pattern_sharp, str_content, flags=re.M): + striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) + elif re.findall(pattern_star, str_content, flags=re.S): + striped_content = re.sub(pattern_star, '', str_content, flags=re.S) + elif re.findall(pattern_xml, str_content, flags=re.S): + striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) + else: + striped_content = str_content + + striped_bt = striped_content.encode("utf-8") + if os.path.exists(name): + os.remove(name) + with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: + f.write(striped_bt) + def __check_environment__(self, device_options): if len(device_options) == 1 and device_options[0].label is None: return True @@ -714,31 +739,6 @@ class CppTestDriver(IDriver): resource_dir, self.config.device) - @staticmethod - def _alter_init(name): - with open(name, "rb") as f: - lines = f.read() - str_content = lines.decode("utf-8") - - pattern_sharp = '^\s*#.*$(\n|\r\n)' - pattern_star = '/\*.*?\*/(\n|\r\n)+' - pattern_xml = '(\n|\r\n)+' - - if re.match(pattern_sharp, str_content, flags=re.M): - striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) - elif re.findall(pattern_star, str_content, flags=re.S): - striped_content = re.sub(pattern_star, '', str_content, flags=re.S) - elif re.findall(pattern_xml, str_content, flags=re.S): - striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) - else: - striped_content = str_content - - striped_bt = striped_content.encode("utf-8") - if os.path.exists(name): - os.remove(name) - with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: - f.write(striped_bt) - def _push_corpus_cov_if_exist(self, suite_file): corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) cov_file = os.path.join( @@ -838,6 +838,124 @@ class JSUnitTestDriver(IDriver): self.hilog = None self.hilog_proc = None + @staticmethod + def _get_acts_test_para(testcase, + testlevel, + testtype, + target_test_path, + suite_file, + filename): + if "actstest" == testtype[0]: + test_para = (" --actstest_out_format=json" + " --actstest_out=%s%s.json") % ( + target_test_path, filename) + return test_para + + if "" != testcase and "" == testlevel: + test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) + elif "" == testcase and "" != testlevel: + level_para = get_level_para_string(testlevel) + test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) + else: + test_para = "" + return test_para + + @staticmethod + def _get_hats_test_para(testcase, + testlevel, + testtype, + target_test_path, + suite_file, + filename): + if "hatstest" == testtype[0]: + test_hats_para = (" --hatstest_out_format=json" + " --hatstest_out=%s%s.json") % ( + target_test_path, filename) + return test_hats_para + + if "" != testcase and "" == testlevel: + test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) + elif "" == testcase and "" != testlevel: + level_para = get_level_para_string(testlevel) + test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) + else: + test_hats_para = "" + return test_hats_para + + @classmethod + def _get_json_shell_timeout(cls, json_filepath): + test_timeout = 0 + try: + with open(json_filepath, 'r') as json_file: + data_dic = json.load(json_file) + if not data_dic: + return test_timeout + else: + if "driver" in data_dic.keys(): + driver_dict = data_dic.get("driver") + if driver_dict and "test-timeout" in driver_dict.keys(): + test_timeout = int(driver_dict["shell-timeout"]) / 1000 + return test_timeout + except JSONDecodeError: + return test_timeout + finally: + print(" get json shell timeout finally") + + @staticmethod + def _get_package_and_ability_name(hap_filepath): + package_name = "" + ability_name = "" + if os.path.exists(hap_filepath): + filename = os.path.basename(hap_filepath) + + # unzip the hap file + hap_bak_path = os.path.abspath(os.path.join( + os.path.dirname(hap_filepath), + "%s.bak" % filename)) + zf_desc = zipfile.ZipFile(hap_filepath) + try: + zf_desc.extractall(path=hap_bak_path) + except RuntimeError as error: + print("Unzip error: ", hap_bak_path) + zf_desc.close() + + # verify config.json file + app_profile_path = os.path.join(hap_bak_path, "config.json") + if not os.path.exists(app_profile_path): + print("file %s not exist" % app_profile_path) + return package_name, ability_name + + if os.path.isdir(app_profile_path): + print("%s is a folder, and not a file" % app_profile_path) + return package_name, ability_name + + # get package_name and ability_name value + load_dict = {} + with open(app_profile_path, 'r') as load_f: + load_dict = json.load(load_f) + profile_list = load_dict.values() + for profile in profile_list: + package_name = profile.get("package") + if not package_name: + continue + abilities = profile.get("abilities") + for abilitie in abilities: + abilities_name = abilitie.get("name") + if abilities_name.startswith("."): + ability_name = package_name + abilities_name[ + abilities_name.find("."):] + else: + ability_name = abilities_name + break + break + + # delete hap_bak_path + if os.path.exists(hap_bak_path): + shutil.rmtree(hap_bak_path) + else: + print("file %s not exist" % hap_filepath) + return package_name, ability_name + def __check_environment__(self, device_options): pass @@ -1053,124 +1171,6 @@ class JSUnitTestDriver(IDriver): _sleep_according_to_result(return_message) return return_message - @staticmethod - def _get_acts_test_para(testcase, - testlevel, - testtype, - target_test_path, - suite_file, - filename): - if "actstest" == testtype[0]: - test_para = (" --actstest_out_format=json" - " --actstest_out=%s%s.json") % ( - target_test_path, filename) - return test_para - - if "" != testcase and "" == testlevel: - test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) - elif "" == testcase and "" != testlevel: - level_para = get_level_para_string(testlevel) - test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) - else: - test_para = "" - return test_para - - @staticmethod - def _get_hats_test_para(testcase, - testlevel, - testtype, - target_test_path, - suite_file, - filename): - if "hatstest" == testtype[0]: - test_hats_para = (" --hatstest_out_format=json" - " --hatstest_out=%s%s.json") % ( - target_test_path, filename) - return test_hats_para - - if "" != testcase and "" == testlevel: - test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) - elif "" == testcase and "" != testlevel: - level_para = get_level_para_string(testlevel) - test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) - else: - test_hats_para = "" - return test_hats_para - - @classmethod - def _get_json_shell_timeout(cls, json_filepath): - test_timeout = 0 - try: - with open(json_filepath, 'r') as json_file: - data_dic = json.load(json_file) - if not data_dic: - return test_timeout - else: - if "driver" in data_dic.keys(): - driver_dict = data_dic.get("driver") - if driver_dict and "test-timeout" in driver_dict.keys(): - test_timeout = int(driver_dict["shell-timeout"]) / 1000 - return test_timeout - except JSONDecodeError: - return test_timeout - finally: - print(" get json shell timeout finally") - - @staticmethod - def _get_package_and_ability_name(hap_filepath): - package_name = "" - ability_name = "" - if os.path.exists(hap_filepath): - filename = os.path.basename(hap_filepath) - - # unzip the hap file - hap_bak_path = os.path.abspath(os.path.join( - os.path.dirname(hap_filepath), - "%s.bak" % filename)) - zf_desc = zipfile.ZipFile(hap_filepath) - try: - zf_desc.extractall(path=hap_bak_path) - except RuntimeError as error: - print("Unzip error: ", hap_bak_path) - zf_desc.close() - - # verify config.json file - app_profile_path = os.path.join(hap_bak_path, "config.json") - if not os.path.exists(app_profile_path): - print("file %s not exist" % app_profile_path) - return package_name, ability_name - - if os.path.isdir(app_profile_path): - print("%s is a folder, and not a file" % app_profile_path) - return package_name, ability_name - - # get package_name and ability_name value - load_dict = {} - with open(app_profile_path, 'r') as load_f: - load_dict = json.load(load_f) - profile_list = load_dict.values() - for profile in profile_list: - package_name = profile.get("package") - if not package_name: - continue - abilities = profile.get("abilities") - for abilitie in abilities: - abilities_name = abilitie.get("name") - if abilities_name.startswith("."): - ability_name = package_name + abilities_name[ - abilities_name.find("."):] - else: - ability_name = abilities_name - break - break - - # delete hap_bak_path - if os.path.exists(hap_bak_path): - shutil.rmtree(hap_bak_path) - else: - print("file %s not exist" % hap_filepath) - return package_name, ability_name - @Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) class OHRustTestDriver(IDriver): diff --git a/src/core/driver/lite_driver.py b/src/core/driver/lite_driver.py index 024db50..6343690 100755 --- a/src/core/driver/lite_driver.py +++ b/src/core/driver/lite_driver.py @@ -124,6 +124,23 @@ class LiteUnitTest(IDriver): return self.log.info("lite device execute request success") + def __result__(self): + pass + + def show_help_info(self): + """ + show help info. + """ + self.log.info("this is test driver for cpp test") + return + + def show_driver_info(self): + """ + show driver info. + """ + self.log.info("this is test driver for cpp test") + return + def _mount_nfs_server(self): #before execute each suits bin, mount nfs self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config( @@ -287,23 +304,6 @@ class LiteUnitTest(IDriver): time.sleep(5) return False - def show_help_info(self): - """ - show help info. - """ - self.log.info("this is test driver for cpp test") - return - - def show_driver_info(self): - """ - show driver info. - """ - self.log.info("this is test driver for cpp test") - return - - def __result__(self): - pass - @Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) class CTestDriver(IDriver): @@ -365,6 +365,9 @@ class CTestDriver(IDriver): self.error_message, report_name) + def __result__(self): + return self.result if os.path.exists(self.result) else "" + def _run_ctest(self, source=None, request=None): if not source: LOG.error("Error: %s don't exist." % source, error_no="00101") @@ -435,10 +438,6 @@ class CTestDriver(IDriver): reset_cmd = [int(item, 16) for item in reset_cmd] return reset_cmd - def __result__(self): - return self.result if os.path.exists(self.result) else "" - - @Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) class JSUnitTestLiteDriver(IDriver): @@ -457,23 +456,7 @@ class JSUnitTestLiteDriver(IDriver): def __check_config__(self, config): pass - - def _get_driver_config(self, json_config): - bundle_name = get_config_value('bundle-name', - json_config.get_driver(), False) - if not bundle_name: - raise ParamError("Can't find bundle-name in config file.", - error_no="00108") - else: - self.config.bundle_name = bundle_name - - ability = get_config_value('ability', - json_config.get_driver(), False) - if not ability: - self.config.ability = "default" - else: - self.config.ability = ability - + def __execute__(self, request): try: LOG.debug("Start execute xdevice extension JSUnit Test") @@ -536,6 +519,25 @@ class JSUnitTestLiteDriver(IDriver): self.config.device.close() + def __result__(self): + return self.result if os.path.exists(self.result) else "" + + def _get_driver_config(self, json_config): + bundle_name = get_config_value('bundle-name', + json_config.get_driver(), False) + if not bundle_name: + raise ParamError("Can't find bundle-name in config file.", + error_no="00108") + else: + self.config.bundle_name = bundle_name + + ability = get_config_value('ability', + json_config.get_driver(), False) + if not ability: + self.config.ability = "default" + else: + self.config.ability = ability + def _run_jsunit(self, request): parser_instances = [] parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) @@ -564,5 +566,4 @@ class JSUnitTestLiteDriver(IDriver): "\n".join(result.split("\n")[0:-1]), "\n")) file_name.flush() - def __result__(self): - return self.result if os.path.exists(self.result) else "" \ No newline at end of file + \ No newline at end of file diff --git a/src/core/driver/openharmony.py b/src/core/driver/openharmony.py index bc7e23e..fd0ea8f 100644 --- a/src/core/driver/openharmony.py +++ b/src/core/driver/openharmony.py @@ -123,20 +123,11 @@ class OHJSUnitTestRunner: self.finished_observer.notify_task_finished() self.retry_times -= 1 - def _get_shell_handler(self, listener): - parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) - if parsers: - parsers = parsers[:1] - parser_instances = [] - for parser in parsers: - parser_instance = parser.__class__() - parser_instance.suites_name = self.suites_name - parser_instance.listeners = listener - parser_instance.runner = self - parser_instances.append(parser_instance) - self.finished_observer = parser_instance - handler = ShellHandler(parser_instances) - return handler + def get_oh_test_runner_path(self): + if self.compile_mode == "esmodule": + return "/ets/testrunner/OpenHarmonyTestRunner" + else: + return "OpenHarmonyTestRunner" def add_arg(self, name, value): if not name or not value: @@ -157,6 +148,21 @@ class OHJSUnitTestRunner: else: args_commands = "%s -s %s %s " % (args_commands, key, value) return args_commands + + def _get_shell_handler(self, listener): + parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) + if parsers: + parsers = parsers[:1] + parser_instances = [] + for parser in parsers: + parser_instance = parser.__class__() + parser_instance.suites_name = self.suites_name + parser_instance.listeners = listener + parser_instance.runner = self + parser_instances.append(parser_instance) + self.finished_observer = parser_instance + handler = ShellHandler(parser_instances) + return handler def _get_run_command(self): command = "" @@ -191,12 +197,6 @@ class OHJSUnitTestRunner: return command - def get_oh_test_runner_path(self): - if self.compile_mode == "esmodule": - return "/ets/testrunner/OpenHarmonyTestRunner" - else: - return "OpenHarmonyTestRunner" - @Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) class OHJSUnitTestDriver(IDriver): @@ -278,6 +278,9 @@ class OHJSUnitTestDriver(IDriver): request.config.report_path, self.result, self.error_message) update_xml(request.root.source.source_file, self.result) + def __result__(self): + return self.result if os.path.exists(self.result) else "" + def _run_oh_jsunit(self, config_file, request): try: if not os.path.exists(config_file): @@ -501,7 +504,3 @@ class OHJSUnitTestDriver(IDriver): stop_catch_device_log(self.log_proc) self.config.device.device_log_collector.\ stop_catch_device_log(self.hilog_proc) - - def __result__(self): - return self.result if os.path.exists(self.result) else "" - diff --git a/src/core/testcase/testcase_manager.py b/src/core/testcase/testcase_manager.py index 7d91ad2..e32b718 100644 --- a/src/core/testcase/testcase_manager.py +++ b/src/core/testcase/testcase_manager.py @@ -47,6 +47,137 @@ FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] class TestCaseManager(object): + @classmethod + def get_valid_suite_file(cls, test_case_out_path, suite_file, options): + partlist = options.partname_list + testmodule = options.testmodule + testsuit = options.testsuit + + if not suite_file.startswith(test_case_out_path): + return False + + if testsuit != "": + short_name, _ = os.path.splitext(os.path.basename(suite_file)) + testsuit_list = testsuit.split(',') + for test in testsuit_list: + if short_name.startswith(test) or \ + testsuit.startswith(short_name): + return True + return False + + is_valid_status = False + suitfile_subpath = suite_file.replace(test_case_out_path, "") + suitfile_subpath = suitfile_subpath.strip(os.sep) + if len(partlist) == 0: + if testmodule != "": + temp_list = suitfile_subpath.split(os.sep) + if len(temp_list) > 2 and testmodule == temp_list[1]: + is_valid_status = True + else: + is_valid_status = True + else: + for partname in partlist: + if testmodule != "": + if suitfile_subpath.startswith( + partname + os.sep + testmodule + os.sep): + is_valid_status = True + break + else: + if suitfile_subpath.startswith(partname + os.sep): + is_valid_status = True + break + return is_valid_status + + @classmethod + def check_python_test_file(cls, suite_file): + if suite_file.endswith(".py"): + filename = os.path.basename(suite_file) + if filename.startswith("test_"): + return True + return False + + @classmethod + def check_hap_test_file(cls, hap_file_path): + try: + if hap_file_path.endswith(".hap"): + json_file_path = hap_file_path.replace(".hap", ".json") + if os.path.exists(json_file_path): + with open(json_file_path, 'r') as json_file: + data_dic = json.load(json_file) + if not data_dic: + return False + else: + if "kits" in data_dic.keys(): + kits_list = data_dic.get("kits") + if len(kits_list) > 0: + for kits_dict in kits_list: + if "test-file-name" not in kits_dict.keys(): + continue + else: + return True + else: + return False + return False + except JSONDecodeError: + return False + finally: + print(" check hap test file finally") + + @classmethod + def get_hap_test_driver(cls, hap_file_path): + data_dic = cls.get_hap_json(hap_file_path) + if not data_dic: + return "" + else: + if "driver" in data_dic.keys(): + driver_dict = data_dic.get("driver") + if bool(driver_dict): + driver_type = driver_dict.get("type") + return driver_type + else: + LOG.error("%s has not set driver." % hap_file_path) + return "" + else: + return "" + + @classmethod + def get_hap_json(cls, hap_file_path): + if hap_file_path.endswith(".hap"): + json_file_path = hap_file_path.replace(".hap", ".json") + if os.path.exists(json_file_path): + with open(json_file_path, 'r') as json_file: + data_dic = json.load(json_file) + return data_dic + else: + return {} + else: + return {} + + @classmethod + def get_hap_part_json(cls, hap_file_path): + if hap_file_path.endswith(".hap"): + json_file_path = hap_file_path.replace(".hap", ".moduleInfo") + if os.path.exists(json_file_path): + with open(json_file_path, 'r') as json_file: + data_dic = json.load(json_file) + return data_dic + else: + return {} + else: + return {} + + @classmethod + def get_part_name_test_file(cls, hap_file_path): + data_dic = cls.get_hap_part_json(hap_file_path) + if not data_dic: + return "" + else: + if "part" in data_dic.keys(): + part_name = data_dic["part"] + return part_name + else: + return "" + def get_test_files(self, test_case_path, options): LOG.info("test case path: " + test_case_path) LOG.info("test type list: " + str(options.testtype)) @@ -219,134 +350,3 @@ class TestCaseManager(object): if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": xts_suit_file_dic.get("JST").append(xts_suite_file) return xts_suit_file_dic - - @classmethod - def get_valid_suite_file(cls, test_case_out_path, suite_file, options): - partlist = options.partname_list - testmodule = options.testmodule - testsuit = options.testsuit - - if not suite_file.startswith(test_case_out_path): - return False - - if testsuit != "": - short_name, _ = os.path.splitext(os.path.basename(suite_file)) - testsuit_list = testsuit.split(',') - for test in testsuit_list: - if short_name.startswith(test) or \ - testsuit.startswith(short_name): - return True - return False - - is_valid_status = False - suitfile_subpath = suite_file.replace(test_case_out_path, "") - suitfile_subpath = suitfile_subpath.strip(os.sep) - if len(partlist) == 0: - if testmodule != "": - temp_list = suitfile_subpath.split(os.sep) - if len(temp_list) > 2 and testmodule == temp_list[1]: - is_valid_status = True - else: - is_valid_status = True - else: - for partname in partlist: - if testmodule != "": - if suitfile_subpath.startswith( - partname + os.sep + testmodule + os.sep): - is_valid_status = True - break - else: - if suitfile_subpath.startswith(partname + os.sep): - is_valid_status = True - break - return is_valid_status - - @classmethod - def check_python_test_file(cls, suite_file): - if suite_file.endswith(".py"): - filename = os.path.basename(suite_file) - if filename.startswith("test_"): - return True - return False - - @classmethod - def check_hap_test_file(cls, hap_file_path): - try: - if hap_file_path.endswith(".hap"): - json_file_path = hap_file_path.replace(".hap", ".json") - if os.path.exists(json_file_path): - with open(json_file_path, 'r') as json_file: - data_dic = json.load(json_file) - if not data_dic: - return False - else: - if "kits" in data_dic.keys(): - kits_list = data_dic.get("kits") - if len(kits_list) > 0: - for kits_dict in kits_list: - if "test-file-name" not in kits_dict.keys(): - continue - else: - return True - else: - return False - return False - except JSONDecodeError: - return False - finally: - print(" check hap test file finally") - - @classmethod - def get_hap_test_driver(cls, hap_file_path): - data_dic = cls.get_hap_json(hap_file_path) - if not data_dic: - return "" - else: - if "driver" in data_dic.keys(): - driver_dict = data_dic.get("driver") - if bool(driver_dict): - driver_type = driver_dict.get("type") - return driver_type - else: - LOG.error("%s has not set driver." % hap_file_path) - return "" - else: - return "" - - @classmethod - def get_hap_json(cls, hap_file_path): - if hap_file_path.endswith(".hap"): - json_file_path = hap_file_path.replace(".hap", ".json") - if os.path.exists(json_file_path): - with open(json_file_path, 'r') as json_file: - data_dic = json.load(json_file) - return data_dic - else: - return {} - else: - return {} - - @classmethod - def get_hap_part_json(cls, hap_file_path): - if hap_file_path.endswith(".hap"): - json_file_path = hap_file_path.replace(".hap", ".moduleInfo") - if os.path.exists(json_file_path): - with open(json_file_path, 'r') as json_file: - data_dic = json.load(json_file) - return data_dic - else: - return {} - else: - return {} - - @classmethod - def get_part_name_test_file(cls, hap_file_path): - data_dic = cls.get_hap_part_json(hap_file_path) - if not data_dic: - return "" - else: - if "part" in data_dic.keys(): - part_name = data_dic["part"] - return part_name - else: - return "" \ No newline at end of file diff --git a/src/core/testkit/kit_lite.py b/src/core/testkit/kit_lite.py index a9b7e01..37f691d 100755 --- a/src/core/testkit/kit_lite.py +++ b/src/core/testkit/kit_lite.py @@ -49,6 +49,23 @@ class DeployKit(ITestKit): self.burn_command = "" self.timeout = "" self.paths = "" + + def copy_file_as_temp(self, original_file, str_length): + """ + To obtain a random string with specified length + Parameters: + original_file : the original file path + str_length: the length of random string + """ + if os.path.isfile(original_file): + random_str = random.sample(string.ascii_letters + string.digits, + str_length) + new_temp_tool_path = '{}_{}{}'.format( + os.path.splitext(original_file)[0], "".join(random_str), + os.path.splitext(original_file)[1]) + return shutil.copyfile(original_file, new_temp_tool_path) + else: + return "" def __check_config__(self, config): self.timeout = str(int(get_config_value( @@ -63,6 +80,22 @@ class DeployKit(ITestKit): "burn_file:{}".format(self.timeout, self.burn_file) raise ParamError(msg, error_no="00108") + def __setup__(self, device, **kwargs): + """ + Execute reset command on the device by cmd serial port and then upload + patch file by deploy tool. + Parameters: + device: the instance of LocalController with one or more + ComController + """ + args = kwargs + source_file = args.get("source_file", None) + self._reset(device) + self._send_file(device, source_file) + + def __teardown__(self, device): + pass + def _reset(self, device): cmd_com = device.device.com_dict.get(ComType.cmd_com) try: @@ -112,36 +145,3 @@ class DeployKit(ITestKit): device.device_allocation_state = DeviceAllocationState.unusable raise LiteDeviceError("%s port set_up wifiiot failed" % deploy_serial_port, error_no="00402") - - def __setup__(self, device, **kwargs): - """ - Execute reset command on the device by cmd serial port and then upload - patch file by deploy tool. - Parameters: - device: the instance of LocalController with one or more - ComController - """ - args = kwargs - source_file = args.get("source_file", None) - self._reset(device) - self._send_file(device, source_file) - - def __teardown__(self, device): - pass - - def copy_file_as_temp(self, original_file, str_length): - """ - To obtain a random string with specified length - Parameters: - original_file : the original file path - str_length: the length of random string - """ - if os.path.isfile(original_file): - random_str = random.sample(string.ascii_letters + string.digits, - str_length) - new_temp_tool_path = '{}_{}{}'.format( - os.path.splitext(original_file)[0], "".join(random_str), - os.path.splitext(original_file)[1]) - return shutil.copyfile(original_file, new_temp_tool_path) - else: - return "" \ No newline at end of file