!308 回退修改,补充opensource测试驱动、build only测试驱动

Merge pull request !308 from liguangjie/master
This commit is contained in:
openharmony_ci 2024-11-08 07:28:51 +00:00 committed by Gitee
commit 9eaa23858a
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
7 changed files with 278 additions and 8 deletions

View File

@ -53,6 +53,8 @@ setup(
'oh_kernel_driver=ohos.drivers.oh_kernel_driver',
'oh_yara_driver=ohos.drivers.oh_yara_driver',
'c_driver_lite=ohos.drivers.c_driver_lite',
'opensource_driver_lite=ohos.drivers.opensource_driver_lite',
'build_only_driver_lite=ohos.drivers.build_only_driver_lite'
],
'listener': [
'listener=ohos.executor.listener',

View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import stat
from ohos.drivers import *
from ohos.constants import ParserType
from ohos.environment.dmlib_lite import generate_report
__all__ = ["BuildOnlyTestDriver"]
LOG = platform_logger("BuildOnlyTestDriver")
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.build_only_test)
class BuildOnlyTestDriver(IDriver):
"""
BuildOnlyTest is a test that runs a native test package on given
device lite device.
"""
config = None
result = ""
error_message = ""
def __init__(self):
self.file_name = ""
self.config_file = None
self.testcases_path = None
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __execute__(self, request):
self.config = request.config
self.config.device = request.config.environment.devices[0]
self.file_name = request.root.source.test_name
self.config_file = request.root.source.config_file
self.testcases_path = request.config.testcases_path
file_path = self._get_log_file()
result_list = self._get_result_list(file_path)
if len(result_list) == 0:
LOG.error(
"Error: source don't exist %s." % request.root.source.
source_file, error_no="00101")
return
total_result = ''
for result in result_list:
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(result, flags, modes), "r",
encoding="utf-8") as file_content:
result = file_content.read()
if not result.endswith('\n'):
result = '%s\n' % result
total_result = '{}{}'.format(total_result, result)
parsers = get_plugin(Plugin.PARSER, ParserType.build_only_test)
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suite_name = self.file_name
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
generate_report(handler, total_result)
@classmethod
def _get_result_list(cls, file_path):
result_list = list()
for root_path, dirs_path, file_names in os.walk(file_path):
for file_name in file_names:
if file_name == "logfile":
result_list.append(os.path.join(root_path, file_name))
return result_list
def _get_log_file(self):
json_config = JsonParser(self.config_file)
log_path = get_config_value('log_path', json_config.get_driver(),
False)
log_path = str(log_path.replace("/", "", 1)) if log_path.startswith(
"/") else str(log_path)
LOG.debug("The log path is:%s" % log_path)
file_path = get_file_absolute_path(log_path,
paths=[self.testcases_path])
LOG.debug("The file path is:%s" % file_path)
return file_path
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from xdevice import DeviceLabelType
from ohos.drivers import *
from ohos.constants import ParserType
from ohos.drivers.constants import init_remote_server
from ohos.exception import LiteDeviceExecuteCommandError
from ohos.error import ErrorMessage
__all__ = ["OpenSourceTestDriver"]
LOG = platform_logger("OpenSourceTestDriver")
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.open_source_test)
class OpenSourceTestDriver(IDriver):
"""
OpenSourceTest is a test that runs a native test package on given
device lite device.
"""
config = None
result = ""
error_message = ""
has_param = False
def __init__(self):
self.rerun = True
self.file_name = ""
self.handler = None
def __check_environment__(self, device_options):
if len(device_options) != 1 or \
device_options[0].label != DeviceLabelType.ipcamera:
self.error_message = "check environment failed"
return False
return True
def __check_config__(self, config=None):
pass
def __execute__(self, request):
kits = []
try:
self.config = request.config
setattr(self.config, "command_result", "")
self.config.device = request.config.environment.devices[0]
init_remote_server(self, request)
config_file = request.root.source.config_file
json_config = JsonParser(config_file)
pre_cmd = get_config_value('pre_cmd', json_config.get_driver(),
False)
execute_dir = get_config_value('execute', json_config.get_driver(),
False)
kits = get_kit_instances(json_config,
request.config.resource_path,
request.config.testcases_path)
for kit in kits:
if not Binder.is_executing():
raise ExecuteTerminate(ErrorMessage.Common.Code_0301013)
copy_list = kit.__setup__(request.config.device,
request=request)
self.file_name = request.root.source.test_name
self.set_file_name(request, request.root.source.test_name)
self.config.device.execute_command_with_timeout(
command=pre_cmd, timeout=1)
self.config.device.execute_command_with_timeout(
command="cd {}".format(execute_dir), timeout=1)
device_log_file = get_device_log_file(
request.config.report_path,
request.config.device.__get_serial__(),
repeat=request.config.repeat,
repeat_round=request.get_repeat_round())
device_log_file_open = \
os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
os.O_APPEND, FilePermission.mode_755)
with os.fdopen(device_log_file_open, "a") as file_name:
for test_bin in copy_list:
if not test_bin.endswith(".run-test"):
continue
if test_bin.startswith("/"):
command = ".%s" % test_bin
else:
command = "./%s" % test_bin
self._do_test_run(command, request)
file_name.write(self.config.command_result)
file_name.flush()
except (LiteDeviceExecuteCommandError, Exception) as exception:
LOG.error(exception, error_no=getattr(exception, "error_no",
"00000"))
self.error_message = exception
finally:
LOG.info("-------------finally-----------------")
# umount the dirs already mount
for kit in kits:
kit.__teardown__(request.config.device)
self.config.device.close()
report_name = "report" if request.root.source. \
test_name.startswith("{") else get_filename_extension(
request.root.source.test_name)[0]
self.result = check_result_report(
request.config.report_path, self.result, self.error_message,
report_name)
def set_file_name(self, request, bin_file):
self.result = "%s.xml" % os.path.join(
request.config.report_path, "result", bin_file)
def run(self, command=None, listener=None, timeout=20):
parsers = get_plugin(Plugin.PARSER,
ParserType.open_source_test)
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suite_name = self.file_name
parser_instance.test_name = command.replace("./", "")
parser_instance.listeners = listener
parser_instances.append(parser_instance)
self.handler = ShellHandler(parser_instances)
for _ in range(3):
result, _, error = self.config.device.execute_command_with_timeout(
command=command, case_type=DeviceTestType.open_source_test,
timeout=timeout, receiver=self.handler)
self.config.command_result = result
if "pass" in result.lower():
break
return error, result, self.handler
def _do_test_run(self, command, request):
listeners = request.listeners
for listener in listeners:
listener.device_sn = self.config.device.device_sn
error, _, _ = self.run(command, listeners, timeout=60)
if error:
LOG.error(
"Execute %s failed" % command, error_no="00402")
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -58,7 +58,7 @@ ID_LIST = b'LIST'
ID_DENT = b'DENT'
DEFAULT_ENCODING = "utf-8"
COMPATIBLE_ENCODING = "ISO-8850-1"
COMPATIBLE_ENCODING = "ISO-8859-1"
SYNC_DATA_MAX = 64 * 1024
REMOTE_PATH_MAX_LENGTH = 1024
SOCK_DATA_MAX = 256

View File

@ -176,12 +176,18 @@ class StackReportListener(UniversalReportListener):
self._suite_name_mapping.update({suite.suite_name: suite.index})
if self.cycle > 0:
suite_index = self._suite_name_mapping.get(suite.suite_name, "")
for suite_item, result_list in self.result:
if suite_item.index != suite_index:
continue
self.suites.pop(suite.index)
result_list.extend(results_of_same_suite)
break
if suite_index not in self.suite_distributions.keys():
for suite_item, result_list in self.result:
if suite_item.index != suite_index:
continue
self.suites.pop(suite.index)
if result_list and result_list[-1].is_completed is not True:
result_list.pop(-1)
result_list.extend(results_of_same_suite)
break
else:
self.suite_distributions.update({suite.index: len(self.result)})
self.result.append((self.suites.get(suite.index), results_of_same_suite))
else:
self.suite_distributions.update({suite.index: len(self.result)})
self.result.append((self.suites.get(suite.index), results_of_same_suite))

View File

@ -226,6 +226,7 @@ class OHJSUnitTestParser(IParser):
return
if test_info.run_time == 0 or test_info.run_time < self.test_time:
test_info.run_time = self.test_time
test_info.is_completed = True
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = test_info.code
@ -234,7 +235,6 @@ class OHJSUnitTestParser(IParser):
continue
if self.runner.retry_times > 1 and result.code == ResultCode.FAILED.value:
listener.tests.pop(test_info.index)
test_info.is_completed = True
self.clear_current_test_info()
def handle_suite_end(self):

View File

@ -200,6 +200,7 @@ class ReportEventListener(AbsReportListener, ABC):
test.stacktrace = test_result.stacktrace
test.code = test_result.code
test.report = test_result.report
test.is_completed = test_result.is_completed
def _handle_test_suites_end(self, test_result, kwargs):
if not kwargs.get("suite_report", False):