mirror of
https://gitee.com/openharmony/xts_tools
synced 2024-11-27 01:50:47 +00:00
cd3f6d2ad4
Signed-off-by: qu-xianfei <quxianfei1@huawei.com> Change-Id: I7bf0226bdf87aad78ba14855b7719138c4421fea
424 lines
18 KiB
Python
Executable File
424 lines
18 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Copyright (c) 2020-2021 Huawei Device Co., Ltd.
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
"""
|
|
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
import subprocess
|
|
import xml.dom.minidom
|
|
from xml.parsers.expat import ExpatError
|
|
from string import Template
|
|
import utils
|
|
import json
|
|
import shutil
|
|
import glob
|
|
import stat
|
|
|
|
|
|
XTS_RESOURCE_ROOT = 'ivbxfj`qspqsjfubsz0sftpvsdf'
|
|
|
|
|
|
def _get_xts_rootpath(project_path):
|
|
if project_path is not None:
|
|
return project_path[0:project_path.find('/xts/') + len('/xts/')]
|
|
raise ValueError('Illegal xts project path ' + project_path)
|
|
|
|
|
|
def _get_subsystem_name(project_path):
|
|
if '/hits/' in project_path:
|
|
index0 = project_path.find('/hits/') + len('/hits/')
|
|
elif '/acts/' in project_path:
|
|
index0 = project_path.find('/acts/') + len('/acts/')
|
|
else:
|
|
raise ValueError('Illegal xts project path ' + project_path)
|
|
index1 = project_path.find('/', index0)
|
|
return project_path[index0:index1]
|
|
|
|
|
|
def _get_resource_rootpath(project_path):
|
|
xts_root = _get_xts_rootpath(project_path)
|
|
resource_reldir = ''.join(chr(ord(ch) - 1) for ch in XTS_RESOURCE_ROOT)
|
|
return os.path.join(xts_root, resource_reldir)
|
|
|
|
|
|
class XDeviceBuilder:
|
|
"""
|
|
To build XTS as an egg package
|
|
@arguments(project_dir, output_dirs)
|
|
"""
|
|
|
|
def __init__(self, arguments):
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--source_dir', help='', required=True)
|
|
parser.add_argument('--configs_dir', help='', required=False)
|
|
parser.add_argument('--resources_dir', help='', required=False)
|
|
parser.add_argument('--suite_out_dir', help='', required=True)
|
|
self.args = parser.parse_args(arguments)
|
|
|
|
def build_xdevice(self):
|
|
"""
|
|
build xdevice package
|
|
:return:
|
|
"""
|
|
ohos_dir = os.path.join(self.args.source_dir, 'plugins', 'ohos')
|
|
devicetest_dir = os.path.join(self.args.source_dir, 'plugins', 'devicetest')
|
|
gen_dir0 = os.path.join(self.args.source_dir, 'dist')
|
|
gen_dir1 = os.path.join(ohos_dir, 'dist')
|
|
gen_dir2 = os.path.join(devicetest_dir, 'dist')
|
|
shutil.rmtree(gen_dir0, ignore_errors=True)
|
|
shutil.rmtree(gen_dir1, ignore_errors=True)
|
|
shutil.rmtree(gen_dir2, ignore_errors=True)
|
|
command0 = ["python", "setup.py", "sdist"]
|
|
command1 = ["python", "setup.py", "sdist"]
|
|
try:
|
|
subprocess.check_call(command0, cwd=self.args.source_dir)
|
|
subprocess.check_call(command1, cwd=ohos_dir)
|
|
subprocess.check_call(command0, cwd=devicetest_dir)
|
|
except subprocess.CalledProcessError as exc:
|
|
print('returncode: {} cmd: {} output: {}'.format(
|
|
exc.returncode, exc.cmd, exc.output))
|
|
|
|
run_scripts = ",".join(
|
|
[os.path.join(self.args.source_dir, "run.bat"),
|
|
os.path.join(self.args.source_dir, "run.sh")])
|
|
|
|
dist_tools_dir = os.path.join(self.args.suite_out_dir, 'tools')
|
|
utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir0,
|
|
to_dir=True)
|
|
utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir1,
|
|
to_dir=True)
|
|
utils.copy_file(output=dist_tools_dir, source_dirs=gen_dir2,
|
|
to_dir=True)
|
|
utils.copy_file(output=self.args.suite_out_dir, sources=run_scripts,
|
|
to_dir=True)
|
|
xtssuite_out_dir = self.args.suite_out_dir
|
|
if (xtssuite_out_dir.find('/acts/') != -1):
|
|
acts_validator_dir = os.path.join(self.args.suite_out_dir, '../acts-validator')
|
|
acts_validator_tools_dir = os.path.join(self.args.suite_out_dir, '../acts-validator/tools')
|
|
utils.copy_file(output=acts_validator_tools_dir, source_dirs=gen_dir0,
|
|
to_dir=True)
|
|
utils.copy_file(output=acts_validator_tools_dir, source_dirs=gen_dir1,
|
|
to_dir=True)
|
|
utils.copy_file(output=acts_validator_dir, sources=run_scripts,
|
|
to_dir=True)
|
|
|
|
|
|
if self.args.configs_dir:
|
|
dist_configs_dir = os.path.join(self.args.suite_out_dir, 'config')
|
|
utils.copy_file(output=dist_configs_dir,
|
|
source_dirs=self.args.configs_dir, to_dir=True)
|
|
if (xtssuite_out_dir.find('/acts/') != -1):
|
|
acts_validator_config_dir = os.path.join(self.args.suite_out_dir, '../acts-validator/config')
|
|
utils.copy_file(output=acts_validator_config_dir,
|
|
source_dirs=self.args.configs_dir, to_dir=True)
|
|
if self.args.resources_dir:
|
|
dist_resources_dir = os.path.join(self.args.suite_out_dir,
|
|
'resource')
|
|
utils.copy_file(output=dist_resources_dir,
|
|
source_dirs=self.args.resources_dir, to_dir=True)
|
|
|
|
|
|
class SuiteArchiveBuilder:
|
|
def __init__(self, arguments):
|
|
self.module_info_dir = None
|
|
self.arguments = arguments
|
|
|
|
def archive_suite(self):
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--suite_path', help='', required=True)
|
|
parser.add_argument('--prebuilts_resource', help='', required=True)
|
|
parser.add_argument('--suite_archive_dir', help='', required=True)
|
|
parser.add_argument('--make_archive', help='', required=True)
|
|
args = parser.parse_args(self.arguments)
|
|
|
|
if not args.make_archive.lower() == 'true':
|
|
print('make archive disabled')
|
|
return 0
|
|
|
|
suite_path = args.suite_path
|
|
if not os.path.isdir(suite_path):
|
|
raise Exception("[%s] does not exist" % suite_path)
|
|
|
|
copyfiles = args.prebuilts_resource.split(",")
|
|
for file_path in copyfiles:
|
|
subprocess.call(["cp", "-rf", file_path, suite_path])
|
|
|
|
archive_name = os.path.basename(suite_path)
|
|
archive_root_path = args.suite_archive_dir
|
|
if not os.path.isdir(archive_root_path):
|
|
os.mkdir(archive_root_path)
|
|
# remove the extra output of target "java_prebuilt"
|
|
# such as ztest-tradefed-common.interface.jar
|
|
subprocess.call(["find", suite_path, "-name", "*.interface.jar",
|
|
"-exec", "rm", "{}", "+"])
|
|
shutil.make_archive(os.path.join(archive_root_path, archive_name),
|
|
"zip", suite_path)
|
|
return 0
|
|
|
|
|
|
class SuiteModuleWithTestbundleBuilder:
|
|
|
|
def __init__(self, arguments):
|
|
self.arguments = arguments
|
|
self.args = None
|
|
|
|
def build_module_with_testbundle(self):
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--build_gen_dir', help='', required=True)
|
|
parser.add_argument('--build_target_name', help='', required=True)
|
|
parser.add_argument('--subsystem_name', help='',
|
|
required=False)
|
|
parser.add_argument('--part_name', help='',
|
|
required=False)
|
|
parser.add_argument('--buildgen_testfile', help='', required=True)
|
|
parser.add_argument('--project_path', help='', required=True)
|
|
parser.add_argument('--test_xml', help='', required=False)
|
|
parser.add_argument('--project_type', help='', required=True)
|
|
parser.add_argument('--suite_out_dir', help='', required=True)
|
|
parser.add_argument('--archive_testfile', help='', required=True)
|
|
parser.add_argument('--apilibrary_deps', help='', required=False)
|
|
parser.add_argument('--test_files', help='', required=False)
|
|
|
|
self.args = parser.parse_args(self.arguments)
|
|
|
|
self._create_testsuite(self.args)
|
|
return 0
|
|
|
|
def _create_testsuite(self, args):
|
|
_test_xml = args.test_xml
|
|
_testcases_dir = os.path.dirname(args.archive_testfile)
|
|
_testsuite_name = os.path.basename(args.archive_testfile)\
|
|
.replace('.hap', '').replace('module_', '')
|
|
_testcase_xml = os.path.join(_testcases_dir, _testsuite_name + ".xml")
|
|
_config_file = os.path.join(_testcases_dir,
|
|
_testsuite_name + ".config")
|
|
_json_file = os.path.join(_testcases_dir, _testsuite_name + ".json")
|
|
|
|
if args.project_type == "js_test_hap":
|
|
self._generate_json_by_template(_test_xml.replace(".xml", ".json"),
|
|
_testsuite_name, _json_file)
|
|
dest_file = os.path.join(
|
|
_testcases_dir, "{}.hap".format(_testsuite_name))
|
|
self._copy_file(args.buildgen_testfile, dest_file)
|
|
os.chmod(dest_file, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
|
return
|
|
if args.project_type == "pythontest":
|
|
self._generate_json_by_template(_test_xml.replace(".xml", ".json"),
|
|
_testsuite_name, os.path.join(
|
|
args.archive_testfile, _testsuite_name + ".json"))
|
|
utils.copy_file(output=self.args.archive_testfile,
|
|
source_dirs=self.args.test_files)
|
|
return
|
|
self._check_file_exist(args.buildgen_testfile)
|
|
self._copy_file(args.buildgen_testfile, args.archive_testfile)
|
|
if args.project_type == "app":
|
|
return
|
|
self._record_testmodule_info(args.build_target_name,
|
|
_testsuite_name,
|
|
_testcases_dir)
|
|
self._record_testpart_info(args.build_target_name,
|
|
_testsuite_name,
|
|
_testcases_dir,
|
|
args.subsystem_name,args.part_name)
|
|
if _test_xml and os.path.exists(_test_xml):
|
|
self._copy_file(_test_xml, _config_file)
|
|
elif _test_xml.replace(".xml", ".json") and \
|
|
os.path.exists(_test_xml.replace(".xml", ".json")):
|
|
self._generate_json_by_template(_test_xml.replace(".xml", ".json"),
|
|
_testsuite_name, _json_file)
|
|
else:
|
|
self._generate_xml_by_template(_test_xml, _testsuite_name,
|
|
_config_file)
|
|
_resource_srcroot = _get_resource_rootpath(args.project_path)
|
|
self._archive_test_file_to_testcase(_testcases_dir)
|
|
|
|
@staticmethod
|
|
def _record_testmodule_info(build_target_name, module_name, testcases_dir):
|
|
if not build_target_name or not module_name:
|
|
raise ValueError(
|
|
'Ethire build_target_name or module_name is invalid')
|
|
|
|
module_info_list_file = os.path.join(testcases_dir, 'module_info.list')
|
|
lines = []
|
|
if os.path.isfile(module_info_list_file):
|
|
with open(module_info_list_file, 'r') as file_read:
|
|
for line in file_read:
|
|
lines.append(line.strip())
|
|
|
|
new_lines = ['%s %s' % (build_target_name, module_name)]
|
|
for line in lines:
|
|
arr = line.strip().split(' ')
|
|
if len(arr) == 0 or arr[0] == build_target_name:
|
|
continue
|
|
new_lines.append(line)
|
|
|
|
# add module info
|
|
new_lines.sort()
|
|
with open(module_info_list_file, 'w') as file_write:
|
|
file_write.write('\n'.join(new_lines) + '\n')
|
|
|
|
@staticmethod
|
|
def _record_testpart_info(build_target_name, module_name, testcases_dir, subsystem_name, part_name):
|
|
if not build_target_name or not module_name:
|
|
raise ValueError(
|
|
'Ethire build_target_name or module_name is invalid')
|
|
|
|
module_info_file = os.path.join(testcases_dir, module_name+'.moduleInfo')
|
|
|
|
if os.path.exists(module_info_file):
|
|
return
|
|
module_info_data = {'subsystem': subsystem_name, 'part': part_name,
|
|
'module': module_name}
|
|
with open(module_info_file, 'w') as out_file:
|
|
json.dump(module_info_data, out_file)
|
|
|
|
@staticmethod
|
|
def _generate_json_by_template(source_file, module_name, dest_file):
|
|
source_content = utils.read_file(source_file)
|
|
values = {"module": module_name}
|
|
xml_content = Template(source_content).substitute(values)
|
|
utils.write_file(dest_file, xml_content, False)
|
|
|
|
@staticmethod
|
|
def _generate_xml_by_template(test_xml, module_name,
|
|
config_file):
|
|
# find the template file
|
|
index = test_xml.rfind(".xml")
|
|
tmpl_file = test_xml[:index] + ".tmpl"
|
|
if not os.path.exists(tmpl_file):
|
|
raise Exception("Can't find the Test.xml or "
|
|
"Test.tmpl in the path %s " %
|
|
os.path.dirname(test_xml))
|
|
tmpl_content = utils.read_file(tmpl_file)
|
|
values = {"module": module_name}
|
|
xml_content = Template(tmpl_content).substitute(values)
|
|
utils.write_file(config_file, xml_content, False)
|
|
|
|
def _copy_file(self, source, target):
|
|
if not os.path.exists(os.path.dirname(target)):
|
|
os.makedirs(os.path.dirname(target))
|
|
if not os.path.exists(target) or (
|
|
os.path.exists(target) and
|
|
(os.stat(target).st_mtime != os.stat(source).st_mtime)):
|
|
print('Trying to copy "%s" to "%s"' % (source, target))
|
|
subprocess.call(["rm", "-rf", target])
|
|
subprocess.call(["cp", "-rf", source, target])
|
|
|
|
def _copy_file_to_target(self, source_file, dest_file):
|
|
if not os.path.exists(source_file):
|
|
print("[ERROR] source_file is not exist. %s" % source_file,
|
|
file=sys.stderr)
|
|
return
|
|
else:
|
|
self._copy_file(source_file, dest_file)
|
|
|
|
def _archive_test_resource(self, xml_file, root_dir, resource_dir):
|
|
_pattern = re.compile("[-=]>")
|
|
if os.path.exists(xml_file):
|
|
try:
|
|
dom = xml.dom.minidom.parse(xml_file)
|
|
childroot = dom.getElementsByTagName("target_preparer")
|
|
for child in childroot:
|
|
for child_atrr in child.getElementsByTagName("option"):
|
|
name = child_atrr.getAttribute("name")
|
|
attr_value = child_atrr.getAttribute("value")
|
|
value = _pattern.split(attr_value)[0].strip()
|
|
if name != "" or value != "":
|
|
if name == "push" and \
|
|
value.startswith("resource/"):
|
|
self._copy_file_to_target(
|
|
os.path.join(root_dir, "../", value),
|
|
os.path.join(resource_dir, value))
|
|
else:
|
|
raise Exception("Test.xml node name and "
|
|
"value not is null")
|
|
except IOError as error:
|
|
print("IO Error: %s" % error, file=sys.stderr)
|
|
except ExpatError as error:
|
|
print("ExpatError Error: %s" % error, file=sys.stderr)
|
|
finally:
|
|
pass
|
|
else:
|
|
with open(xml_file.replace(".config", ".json"),
|
|
"r", encoding="UTF-8") as json_file:
|
|
json_str = json.load(json_file)
|
|
kits = json_str["kits"]
|
|
for kit in kits:
|
|
types = kit["type"]
|
|
if types == "PushKit":
|
|
push_resources = kit["push"]
|
|
for resource in push_resources:
|
|
value = _pattern.split(resource)[0].strip()
|
|
if value.startswith("resource/"):
|
|
self._copy_file_to_target(
|
|
os.path.join(root_dir, "../", value),
|
|
os.path.join(resource_dir, value))
|
|
|
|
def _archive_test_file_to_testcase(self, cases_dir):
|
|
if self.args.test_files is None:
|
|
return
|
|
arr_test_files = self.args.test_files.split(',')
|
|
for file in arr_test_files:
|
|
if file == "":
|
|
continue
|
|
self._copy_file_to_target(file,
|
|
os.path.join(cases_dir,
|
|
os.path.basename(file)))
|
|
|
|
def _check_file_exist(self, file_path):
|
|
if not os.path.exists(file_path):
|
|
raise Exception("File [%s] does not exist!" % file_path)
|
|
|
|
|
|
ACTION_MAP = {"build_module": "SuiteModuleBuilder",
|
|
"build_xdevice": "XDeviceBuilder",
|
|
"archive_suite": "SuiteArchiveBuilder",
|
|
"build_module_with_testbundle":
|
|
"SuiteModuleWithTestbundleBuilder"}
|
|
|
|
|
|
def _find_action(action, arguments):
|
|
class_name = ACTION_MAP[action]
|
|
if not class_name:
|
|
raise ValueError('Unsupported operation: %s' % action)
|
|
|
|
this_module = sys.modules[__name__]
|
|
class_def = getattr(this_module, class_name, None)
|
|
if not class_def:
|
|
raise ValueError(
|
|
'Unsupported operation(No Implementation Class): %s' % action)
|
|
class_obj = class_def(arguments)
|
|
func = getattr(class_obj, action, None)
|
|
if not func:
|
|
raise ValueError(
|
|
'Unsupported operation(No Implementation Method): %s' % action)
|
|
return func
|
|
|
|
|
|
def main(arguments):
|
|
action = arguments[0]
|
|
args = arguments[1:]
|
|
func = _find_action(action, args)
|
|
func()
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(sys.argv[1:]))
|