#!/usr/bin/env python # -*- coding: utf-8 -*- """ Copyright (c) 2020-2021 Huawei Device Co., Ltd. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import os import sys import argparse import re import subprocess import xml.dom.minidom from xml.parsers.expat import ExpatError from string import Template import utils import json import shutil import glob import stat XTS_RESOURCE_ROOT = 'ivbxfj`qspqsjfubsz0sftpvsdf' def _get_xts_rootpath(project_path): if project_path is not None: return project_path[0:project_path.find('/xts/') + len('/xts/')] raise ValueError('Illegal xts project path ' + project_path) def _get_subsystem_name(project_path): if '/hits/' in project_path: index0 = project_path.find('/hits/') + len('/hits/') elif '/acts/' in project_path: index0 = project_path.find('/acts/') + len('/acts/') else: raise ValueError('Illegal xts project path ' + project_path) index1 = project_path.find('/', index0) return project_path[index0:index1] def _get_resource_rootpath(project_path): xts_root = _get_xts_rootpath(project_path) resource_reldir = ''.join(chr(ord(ch) - 1) for ch in XTS_RESOURCE_ROOT) return os.path.join(xts_root, resource_reldir) class XDeviceBuilder: """ To build XTS as an egg package @arguments(project_dir, output_dirs) """ def __init__(self, arguments): parser = argparse.ArgumentParser() parser.add_argument('--source_dir', help='', required=True) parser.add_argument('--configs_dir', help='', required=False) parser.add_argument('--resources_dir', help='', required=False) parser.add_argument('--suite_out_dir', help='', required=True) self.args = parser.parse_args(arguments) def build_xdevice(self): """ build xdevice package :return: """ ohos_dir = os.path.join(self.args.source_dir, 'plugins', 'ohos') devicetest_dir = os.path.join(self.args.source_dir, 'plugins', 'devicetest') gen_dir0 = os.path.join(self.args.source_dir, 'dist') gen_dir1 = os.path.join(ohos_dir, 'dist') gen_dir2 = os.path.join(devicetest_dir, 'dist') shutil.rmtree(gen_dir0, ignore_errors=True) shutil.rmtree(gen_dir1, ignore_errors=True) shutil.rmtree(gen_dir2, ignore_errors=True) command0 = ["python", "setup.py", "sdist"] command1 = ["python", "setup.py", "sdist"] try: subprocess.check_call(command0, cwd=self.args.source_dir) subprocess.check_call(command1, cwd=ohos_dir) subprocess.check_call(command0, cwd=devicetest_dir) except subprocess.CalledProcessError as exc: print('returncode: {} cmd: {} output: {}'.format( exc.returncode, exc.cmd, exc.output)) run_scripts = ",".join( [os.path.join(self.args.source_dir, "run.bat"), os.path.join(self.args.source_dir, "run.sh")]) dist_tools_dir = os.path.join(self.args.suite_out_dir, 'tools') utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir0, to_dir=True) utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir1, to_dir=True) utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir2, to_dir=True) utils.copy_file(output=self.args.suite_out_dir, sources=run_scripts, to_dir=True) xtssuite_out_dir = self.args.suite_out_dir if (xtssuite_out_dir.find('/acts/') != -1): acts_validator_dir = os.path.join(self.args.suite_out_dir, '../acts-validator') acts_validator_tools_dir = os.path.join(self.args.suite_out_dir, '../acts-validator/tools') utils.copy_file(output=acts_validator_tools_dir, source_dirs=gen_dir0, to_dir=True) utils.copy_file(output=acts_validator_tools_dir, source_dirs=gen_dir1, to_dir=True) utils.copy_file(output=acts_validator_dir, sources=run_scripts, to_dir=True) if self.args.configs_dir: dist_configs_dir = os.path.join(self.args.suite_out_dir, 'config') utils.copy_file(output=dist_configs_dir, source_dirs=self.args.configs_dir, to_dir=True) if (xtssuite_out_dir.find('/acts/') != -1): acts_validator_config_dir = os.path.join(self.args.suite_out_dir, '../acts-validator/config') utils.copy_file(output=acts_validator_config_dir, source_dirs=self.args.configs_dir, to_dir=True) if self.args.resources_dir: dist_resources_dir = os.path.join(self.args.suite_out_dir, 'resource') utils.copy_file(output=dist_resources_dir, source_dirs=self.args.resources_dir, to_dir=True) class SuiteArchiveBuilder: def __init__(self, arguments): self.module_info_dir = None self.arguments = arguments def archive_suite(self): parser = argparse.ArgumentParser() parser.add_argument('--suite_path', help='', required=True) parser.add_argument('--prebuilts_resource', help='', required=True) parser.add_argument('--suite_archive_dir', help='', required=True) parser.add_argument('--make_archive', help='', required=True) args = parser.parse_args(self.arguments) if not args.make_archive.lower() == 'true': print('make archive disabled') return 0 suite_path = args.suite_path if not os.path.isdir(suite_path): raise Exception("[%s] does not exist" % suite_path) copyfiles = args.prebuilts_resource.split(",") for file_path in copyfiles: subprocess.call(["cp", "-rf", file_path, suite_path]) archive_name = os.path.basename(suite_path) archive_root_path = args.suite_archive_dir if not os.path.isdir(archive_root_path): os.mkdir(archive_root_path) # remove the extra output of target "java_prebuilt" # such as ztest-tradefed-common.interface.jar subprocess.call(["find", suite_path, "-name", "*.interface.jar", "-exec", "rm", "{}", "+"]) shutil.make_archive(os.path.join(archive_root_path, archive_name), "zip", suite_path) return 0 class SuiteModuleWithTestbundleBuilder: def __init__(self, arguments): self.arguments = arguments self.args = None def build_module_with_testbundle(self): parser = argparse.ArgumentParser() parser.add_argument('--build_gen_dir', help='', required=True) parser.add_argument('--build_target_name', help='', required=True) parser.add_argument('--subsystem_name', help='', required=False) parser.add_argument('--part_name', help='', required=False) parser.add_argument('--buildgen_testfile', help='', required=True) parser.add_argument('--project_path', help='', required=True) parser.add_argument('--test_xml', help='', required=False) parser.add_argument('--project_type', help='', required=True) parser.add_argument('--suite_out_dir', help='', required=True) parser.add_argument('--archive_testfile', help='', required=True) parser.add_argument('--apilibrary_deps', help='', required=False) parser.add_argument('--test_files', help='', required=False) self.args = parser.parse_args(self.arguments) self._create_testsuite(self.args) return 0 def _create_testsuite(self, args): _test_xml = args.test_xml _testcases_dir = os.path.dirname(args.archive_testfile) _testsuite_name = os.path.basename(args.archive_testfile)\ .replace('.hap', '').replace('module_', '') _testcase_xml = os.path.join(_testcases_dir, _testsuite_name + ".xml") _config_file = os.path.join(_testcases_dir, _testsuite_name + ".config") _json_file = os.path.join(_testcases_dir, _testsuite_name + ".json") if args.project_type == "js_test_hap": self._generate_json_by_template(_test_xml.replace(".xml", ".json"), _testsuite_name, _json_file) dest_file = os.path.join( _testcases_dir, "{}.hap".format(_testsuite_name)) self._copy_file(args.buildgen_testfile, dest_file) os.chmod(dest_file, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) return if args.project_type == "pythontest": self._generate_json_by_template(_test_xml.replace(".xml", ".json"), _testsuite_name, os.path.join( args.archive_testfile, _testsuite_name + ".json")) utils.copy_file(output=self.args.archive_testfile, source_dirs=self.args.test_files) return self._check_file_exist(args.buildgen_testfile) self._copy_file(args.buildgen_testfile, args.archive_testfile) if args.project_type == "app": return self._record_testmodule_info(args.build_target_name, _testsuite_name, _testcases_dir) self._record_testpart_info(args.build_target_name, _testsuite_name, _testcases_dir, args.subsystem_name,args.part_name) if _test_xml and os.path.exists(_test_xml): self._copy_file(_test_xml, _config_file) elif _test_xml.replace(".xml", ".json") and \ os.path.exists(_test_xml.replace(".xml", ".json")): self._generate_json_by_template(_test_xml.replace(".xml", ".json"), _testsuite_name, _json_file) else: self._generate_xml_by_template(_test_xml, _testsuite_name, _config_file) _resource_srcroot = _get_resource_rootpath(args.project_path) self._archive_test_file_to_testcase(_testcases_dir) @staticmethod def _record_testmodule_info(build_target_name, module_name, testcases_dir): if not build_target_name or not module_name: raise ValueError( 'Ethire build_target_name or module_name is invalid') module_info_list_file = os.path.join(testcases_dir, 'module_info.list') lines = [] if os.path.isfile(module_info_list_file): with open(module_info_list_file, 'r') as file_read: for line in file_read: lines.append(line.strip()) new_lines = ['%s %s' % (build_target_name, module_name)] for line in lines: arr = line.strip().split(' ') if len(arr) == 0 or arr[0] == build_target_name: continue new_lines.append(line) # add module info new_lines.sort() with open(module_info_list_file, 'w') as file_write: file_write.write('\n'.join(new_lines) + '\n') @staticmethod def _record_testpart_info(build_target_name, module_name, testcases_dir, subsystem_name, part_name): if not build_target_name or not module_name: raise ValueError( 'Ethire build_target_name or module_name is invalid') module_info_file = os.path.join(testcases_dir, module_name+'.moduleInfo') if os.path.exists(module_info_file): return module_info_data = {'subsystem': subsystem_name, 'part': part_name, 'module': module_name} with open(module_info_file, 'w') as out_file: json.dump(module_info_data, out_file) @staticmethod def _generate_json_by_template(source_file, module_name, dest_file): source_content = utils.read_file(source_file) values = {"module": module_name} xml_content = Template(source_content).substitute(values) utils.write_file(dest_file, xml_content, False) @staticmethod def _generate_xml_by_template(test_xml, module_name, config_file): # find the template file index = test_xml.rfind(".xml") tmpl_file = test_xml[:index] + ".tmpl" if not os.path.exists(tmpl_file): raise Exception("Can't find the Test.xml or " "Test.tmpl in the path %s " % os.path.dirname(test_xml)) tmpl_content = utils.read_file(tmpl_file) values = {"module": module_name} xml_content = Template(tmpl_content).substitute(values) utils.write_file(config_file, xml_content, False) def _copy_file(self, source, target): if not os.path.exists(os.path.dirname(target)): os.makedirs(os.path.dirname(target)) if not os.path.exists(target) or ( os.path.exists(target) and (os.stat(target).st_mtime != os.stat(source).st_mtime)): print('Trying to copy "%s" to "%s"' % (source, target)) subprocess.call(["rm", "-rf", target]) subprocess.call(["cp", "-rf", source, target]) def _copy_file_to_target(self, source_file, dest_file): if not os.path.exists(source_file): print("[ERROR] source_file is not exist. %s" % source_file, file=sys.stderr) return else: self._copy_file(source_file, dest_file) def _archive_test_resource(self, xml_file, root_dir, resource_dir): _pattern = re.compile("[-=]>") if os.path.exists(xml_file): try: dom = xml.dom.minidom.parse(xml_file) childroot = dom.getElementsByTagName("target_preparer") for child in childroot: for child_atrr in child.getElementsByTagName("option"): name = child_atrr.getAttribute("name") attr_value = child_atrr.getAttribute("value") value = _pattern.split(attr_value)[0].strip() if name != "" or value != "": if name == "push" and \ value.startswith("resource/"): self._copy_file_to_target( os.path.join(root_dir, "../", value), os.path.join(resource_dir, value)) else: raise Exception("Test.xml node name and " "value not is null") except IOError as error: print("IO Error: %s" % error, file=sys.stderr) except ExpatError as error: print("ExpatError Error: %s" % error, file=sys.stderr) finally: pass else: with open(xml_file.replace(".config", ".json"), "r", encoding="UTF-8") as json_file: json_str = json.load(json_file) kits = json_str["kits"] for kit in kits: types = kit["type"] if types == "PushKit": push_resources = kit["push"] for resource in push_resources: value = _pattern.split(resource)[0].strip() if value.startswith("resource/"): self._copy_file_to_target( os.path.join(root_dir, "../", value), os.path.join(resource_dir, value)) def _archive_test_file_to_testcase(self, cases_dir): if self.args.test_files is None: return arr_test_files = self.args.test_files.split(',') for file in arr_test_files: if file == "": continue self._copy_file_to_target(file, os.path.join(cases_dir, os.path.basename(file))) def _check_file_exist(self, file_path): if not os.path.exists(file_path): raise Exception("File [%s] does not exist!" % file_path) ACTION_MAP = {"build_module": "SuiteModuleBuilder", "build_xdevice": "XDeviceBuilder", "archive_suite": "SuiteArchiveBuilder", "build_module_with_testbundle": "SuiteModuleWithTestbundleBuilder"} def _find_action(action, arguments): class_name = ACTION_MAP[action] if not class_name: raise ValueError('Unsupported operation: %s' % action) this_module = sys.modules[__name__] class_def = getattr(this_module, class_name, None) if not class_def: raise ValueError( 'Unsupported operation(No Implementation Class): %s' % action) class_obj = class_def(arguments) func = getattr(class_obj, action, None) if not func: raise ValueError( 'Unsupported operation(No Implementation Method): %s' % action) return func def main(arguments): action = arguments[0] args = arguments[1:] func = _find_action(action, args) func() return 0 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))