1、修改OHJS驱动的重跑机制 2、适配OHJS多测试套情况

Signed-off-by: deveco_test <liguangjie1@huawei.com>
This commit is contained in:
deveco_test 2022-08-12 15:34:02 +08:00
parent 75f62eeead
commit 50b39dc808
3 changed files with 221 additions and 129 deletions

View File

@ -326,7 +326,7 @@ class OHJSUnitTestDriver(IDriver):
self.config.device.connector_command("target mount")
do_module_kit_setup(request, self.kits)
self.runner = OHJSUnitTestRunner(self.config)
self.runner.suite_name = request.get_module_name()
self.runner.suites_name = request.get_module_name()
# execute test case
self._get_runner_config(json_config)
oh_jsunit_para_parse(self.runner, self.config.testargs)
@ -376,6 +376,7 @@ class OHJSUnitTestDriver(IDriver):
if test_to_run else 0))
if not test_to_run or not self.rerun:
self.runner.run(listener)
self.runner.notify_finished()
else:
self._run_with_rerun(listener, test_to_run)
@ -402,35 +403,43 @@ class OHJSUnitTestDriver(IDriver):
expected_tests = TestDescription.remove_test(expected_tests,
test_run)
if not expected_tests:
LOG.debug("No tests to re-run, all tests executed at least "
"once.")
if self.rerun_all:
self._rerun_all(expected_tests, listener)
LOG.debug("No tests to re-run twice,please check")
self.runner.notify_finished()
else:
self._rerun_serially(expected_tests, listener)
self._rerun_twice(expected_tests, listener)
else:
LOG.debug("Rerun once success")
self.runner.notify_finished()
def _rerun_all(self, expected_tests, listener):
def _rerun_twice(self, expected_tests, listener):
tests = []
for test in expected_tests:
tests.append("%s#%s" % (test.class_name, test.test_name))
self.runner.add_arg("class", ",".join(tests))
LOG.debug("Ready to rerun all, expect run: %s" % len(expected_tests))
LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
test_run = self._run_tests(listener)
LOG.debug("Rerun all, has run: %s" % len(test_run))
LOG.debug("Rerun twice, has run: %s" % len(test_run))
if len(test_run) < len(expected_tests):
expected_tests = TestDescription.remove_test(expected_tests,
test_run)
if not expected_tests:
LOG.debug("Rerun textFile success")
self._rerun_serially(expected_tests, listener)
LOG.debug("No tests to re-run third,please check")
self.runner.notify_finished()
else:
self._rerun_third(expected_tests, listener)
else:
LOG.debug("Rerun twice success")
self.runner.notify_finished()
def _rerun_serially(self, expected_tests, listener):
LOG.debug("Rerun serially, expected run: %s" % len(expected_tests))
def _rerun_third(self, expected_tests, listener):
tests = []
for test in expected_tests:
self.runner.add_arg(
"class", "%s#%s" % (test.class_name, test.test_name))
self.runner.rerun(listener, test)
self.runner.remove_arg("class")
tests.append("%s#%s" % (test.class_name, test.test_name))
self.runner.add_arg("class", ",".join(tests))
LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
self._run_tests(listener)
LOG.debug("Rerun third success")
self.runner.notify_finished()
def __result__(self):
return self.result if os.path.exists(self.result) else ""
@ -444,7 +453,7 @@ class OHJSUnitTestRunner:
self.rerun_attemp = 3
self.suite_recorder = {}
self.finished = False
self.expect_tests_dict = None
self.expect_tests_dict = dict()
self.finished_observer = None
def dry_run(self):
@ -459,10 +468,20 @@ class OHJSUnitTestRunner:
command = self._get_dry_run_command()
self.config.device.execute_shell_command(
command, timeout=self.config.timeout, receiver=handler, retry=0)
self.expect_tests_dict = parser_instances[0].tests_dict
return parser_instances[0].tests
def run(self, listener):
handler = self._get_shell_handler(listener)
command = self._get_run_command()
self.config.device.execute_shell_command(
command, timeout=self.config.timeout, receiver=handler, retry=0)
def notify_finished(self):
if self.finished_observer:
self.finished_observer.notify_task_finished()
def _get_shell_handler(self, listener):
parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
if parsers:
parsers = parsers[:1]
@ -471,54 +490,11 @@ class OHJSUnitTestRunner:
parser_instance = parser.__class__()
parser_instance.suites_name = self.suites_name
parser_instance.listeners = listener
parser_instances.runner = self
parser_instances.append(parser_instance)
self.finished_observer = parser_instance
handler = ShellHandler(parser_instances)
command = self._get_run_command()
self.config.device.execute_shell_command(
command, timeout=self.config.timeout, receiver=handler, retry=0)
def rerun(self, listener, test):
handler = None
if self.rerun_attemp:
test_tracker = CollectingPassListener()
try:
listener_copy = listener.copy()
listener_copy.append(test_tracker)
parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
if parsers:
parsers = parsers[:1]
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suite_name = self.suite_name
parser_instance.listeners = listener_copy
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
command = self._get_run_command()
self.config.device.execute_shell_command(
command, timeout=self.config.timeout, receiver=handler,
retry=0)
except ShellCommandUnresponsiveException as _:
LOG.debug("Exception: ShellCommandUnresponsiveException")
finally:
if not len(test_tracker.get_current_run_results()):
LOG.debug("No test case is obtained finally")
self.rerun_attemp -= 1
handler.parsers[0].mark_test_as_blocked(test)
else:
LOG.debug("Not execute and mark as blocked finally")
parsers = get_plugin(Plugin.PARSER, CommonParserType.cpptest)
if parsers:
parsers = parsers[:1]
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suite_name = self.suite_name
parser_instance.listeners = listener
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
handler.parsers[0].mark_test_as_blocked(test)
return handler
def add_arg(self, name, value):
if not name or not value:

View File

@ -1009,7 +1009,16 @@ class OHJSUnitPrefixes(Enum):
STATUS_CODE = "OHOS_REPORT_STATUS_CODE: "
RESULT = "OHOS_REPORT_RESULT: "
CODE = "OHOS_REPORT_CODE: "
TestFinished = "TestFinished-ResultCode: 0"
TEST_FINISHED_RESULT_MSG = "TestFinished-ResultMsg: "
class OHJSUnitItemConstants(Enum):
CLASS = "class"
TEST = "test"
NUM_TESTS = "numtests"
STACK = "stack"
SUITE_COMSUMING = "suiteconsuming"
APP_DIED = "App died"
@Plugin(type=Plugin.PARSER, id=CommonParserType.oh_jsunit)
@ -1024,6 +1033,8 @@ class OHJSUnitTestParser(IParser):
self.start_time = datetime.datetime.now()
self.test_time = 0
self.test_run_finished = False
self.cur_num = -1
self.runner = None
def get_suite_name(self):
return self.suites_name
@ -1040,31 +1051,64 @@ class OHJSUnitTestParser(IParser):
def parse(self, line):
if not str(line).strip():
return
if line.startswith(OHJSUnitPrefixes.STATUS.value):
self.submit_current_key_value()
self.parse_key(line, len(OHJSUnitPrefixes.STATUS.value))
if line.startswith(OHJSUnitPrefixes.SUM.value):
self.handle_sum_line(line)
elif line.startswith(OHJSUnitPrefixes.STATUS.value):
self.handle_status_line(line)
elif line.startswith(OHJSUnitPrefixes.STATUS_CODE.value):
self.submit_current_key_value()
self.parse_status_code(line)
elif line.startswith(OHJSUnitPrefixes.TestFinished.value):
self.handle_suite_end()
elif line.startswith(OHJSUnitPrefixes.TEST_FINISHED_RESULT_MSG.value):
self._handle_result_msg(line)
def handle_sum_line(self, line):
value = line[len(OHJSUnitPrefixes.SUM.value):].split("=", 1)[0]
self.cur_num = int(value)
def handle_status_line(self, line):
self.parse_key(line, len(OHJSUnitPrefixes.STATUS.value))
if self.cur_num > 0 and \
self.current_key == OHJSUnitItemConstants.CLASS.value:
if self.current_value not in self.runner.suite_recorder.keys():
current_suite = self.state_machine.suite(reset=True)
current_suite.test_num = self.cur_num
current_suite.suite_name = self.current_value
self.runner.suite_recorder.update({
self.current_value:
[len(self.runner.suite_recorder.keys()),
current_suite]})
else:
current_suite = self.runner.suite_recorder.get(
self.current_value)[1]
self.state_machine.current_suite = current_suite
self.cur_num = -1
self.current_key = None
self.current_value = None
self.state_machine.running_test_index = 0
for listener in self.get_listeners():
suite = copy.copy(current_suite)
listener.__started__(LifeCycle.TestSuite, suite)
else:
if self.current_key == OHJSUnitItemConstants.SUITE_COMSUMING.value:
self.test_time = int(self.current_value)
self.handle_suite_end()
else:
self.submit_current_key_value()
self.parse_key(line, len(OHJSUnitPrefixes.STATUS.value))
def submit_current_key_value(self):
if self.current_key and self.current_value:
status_value = self.current_value
test_info = self.state_machine.test()
if self.current_key == "class":
if self.current_key == OHJSUnitItemConstants.CLASS.value:
test_info.test_class = status_value
elif self.current_key == "test":
elif self.current_key == OHJSUnitItemConstants.TEST.value:
test_info.test_name = status_value
elif self.current_key == "numtests":
elif self.current_key == OHJSUnitItemConstants.NUM_TESTS.value:
test_info.num_tests = int(status_value)
elif self.current_key == "Error":
self.handle_test_run_failed(status_value)
elif self.current_key == "stack":
elif self.current_key == OHJSUnitItemConstants.STACK.value:
test_info.stacktrace = status_value
elif self.current_key == "stream":
pass
self.current_key = None
self.current_value = None
@ -1091,13 +1135,13 @@ class OHJSUnitTestParser(IParser):
if not test_info.test_name or not test_info.test_class:
LOG.info("Invalid instrumentation status bundle")
return
self.report_test_run_started(test_info)
if test_info.code == StatusCodes.START.value:
self.start_time = datetime.datetime.now()
for listener in self.get_listeners():
result = copy.copy(test_info)
listener.__started__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.FAILURE.value:
return
if test_info.code == StatusCodes.FAILURE.value:
self.state_machine.running_test_index += 1
test_info.current = self.state_machine.running_test_index
end_time = datetime.datetime.now()
@ -1137,15 +1181,17 @@ class OHJSUnitTestParser(IParser):
listener.__ended__(LifeCycle.TestCase, result)
test_info.is_completed = True
def report_test_run_started(self, test_result):
test_suite = self.state_machine.suite()
if not self.state_machine.suite().is_started:
if not test_suite.test_num or not test_suite.suite_name:
test_suite.suite_name = self.get_suite_name()
test_suite.test_num = test_result.num_tests
for listener in self.get_listeners():
suite_report = copy.copy(test_suite)
listener.__started__(LifeCycle.TestSuite, suite_report)
@classmethod
def output_stack_trace(cls, test_info):
if check_pub_key_exist():
return
if test_info.stacktrace:
stack_lines = test_info.stacktrace.split(r"\r\n")
LOG.error("Stacktrace information is:")
for line in stack_lines:
line.strip()
if line:
LOG.error(line)
@staticmethod
def check_legality(name):
@ -1154,44 +1200,7 @@ class OHJSUnitTestParser(IParser):
return True
def __done__(self):
suite_result = self.state_machine.suite()
suite_result.run_time = self.test_time
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuites, suite,
suites_name=self.suites_name)
self.state_machine.current_suite = None
def mark_test_as_blocked(self, test):
if not self.state_machine.current_suite and not test.class_name:
return
suite_name = self.state_machine.current_suite.suite_name if \
self.state_machine.current_suite else self.get_suite_name()
suite_result = self.state_machine.suite(reset=True)
test_result = self.state_machine.test(reset=True)
suite_result.suite_name = suite_name or test.class_name
suite_result.suite_num = 1
test_result.test_class = test.class_name
test_result.test_name = test.test_name
test_result.stacktrace = "error_msg: marked blocked"
test_result.num_tests = 1
test_result.run_time = 0
test_result.code = ResultCode.BLOCKED.value
for listener in self.get_listeners():
suite_report = copy.copy(suite_result)
listener.__started__(LifeCycle.TestSuite, suite_report)
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__started__(LifeCycle.TestCase, test_result)
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__ended__(LifeCycle.TestCase, test_result)
for listener in self.get_listeners():
suite_report = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite_report,
is_clear=True)
self.__done__()
pass
def handle_suite_end(self):
suite_result = self.state_machine.suite()
@ -1201,6 +1210,108 @@ class OHJSUnitTestParser(IParser):
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
def handle_suites_end(self):
suite_result = self.state_machine.suite()
suite_result.run_time = self.test_time
suite_result.is_completed = True
for listener in self.get_listeners():
if listener.__class__.__name__ == "ReportListener":
self._cal_result(listener)
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuites, suite,
suites_name=self.suites_name)
self.state_machine.current_suite = None
def _cal_result(self, report_listener):
result_len = len(report_listener.result)
suites_len = len(report_listener.suites)
if result_len > suites_len:
diff_result_tuple_list = report_listener.result[suites_len:]
report_listener.result = report_listener.result[:suites_len]
for diff_result_tuple in diff_result_tuple_list:
suite, case_result_list = diff_result_tuple
pos = self.runner.suite_recorder.get(suite.suite_name)[0]
report_listener.result[pos][1].extend(case_result_list)
self._handle_lacking_one_testcase(report_listener)
self._handle_lacking_whole_suite(report_listener)
def _handle_lacking_one_testcase(self, report_listener):
for suite in report_listener.suites.value():
test_des_list = self.runner.expect_tests_dict.get(
suite.suite_name, [])
pos = self.runner.suite_recorder.get(suite.suite_name)[0]
if len(test_des_list) == len(report_listener.result[pos][1]):
continue
for test_des in test_des_list:
is_contain = False
for case in report_listener.result[pos][1]:
if case.test_name == test_des.test_name:
is_contain = True
break
if not is_contain:
test_result = self.state_machine.test(reset=True)
test_result.test_class = test_des.class_name
test_result.test_name = test_des.test_name
test_result.stacktrace = "error_msg:mark blocked"
test_result.num_tests = 1
test_result.run_time = 0
test_result.code = ResultCode.BLOCKED.value
report_listener.result[pos][1].append(test_result)
def _handle_lacking_whole_suite(self, report_listener):
all_suite_set = set(self.runner.expect_tests_dict.keys())
un_suite_set = set()
if len(all_suite_set) > len(report_listener.suites):
suite_name_set = set()
for suite in report_listener.suites.values():
suite_name_set.add(suite.suite_name)
un_suite_set.union(all_suite_set.difference(suite_name_set))
for un_suite in un_suite_set:
test_des_list = self.runner.expect_tests_dict.get(
un_suite.suite_name, [])
current_suite = self.state_machine.suite(reset=True)
current_suite.test_num = len(test_des_list)
current_suite.suite_name = self.current_value
for listener in self.get_listeners():
suite = copy.copy(current_suite)
listener.__started__(lifecycle.TestSuite, suite)
for test in test_des_list:
test_result = self.state_machine.test(reset=True)
test_result.test_class = test.class_name
test_result.test_name = test.test_name
test_result.stacktrace = "error_msg:mark blocked"
test_result.num_tests = 1
test_result.run_time = 0
test_result.current = self.state_machine.running_test_index + 1
test_result.code = ResultCode.BLOCKED.value
test_result = copy.copy(test_result)
for listener in self.get_listeners():
listener.__started__(LifeCycle.TestCase, test_result)
test_result = copy.copy(test_result)
for listener in self.get_listeners():
listener.__ended__(LifeCycle.TestCase, test_result)
current_suite.run_time = self.test_time
current_suite.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(current_suite)
listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
def notify_task_finished(self):
self.handle_suites_end()
def _handle_result_msg(self, line):
if OHJSUnitItemConstants.APP_DIED.value in line:
test_result = self.state_machine.test()
suite = self.state_machine.suite()
if not test_result.is_completed:
if self.check_legality(test_result.test_class) and \
self.check_legality(test_result.test_name):
self.report_result(test_result)
self.clear_current_test_info()
if not suite.is_completed:
self.handle_suite_end()
@Plugin(type=Plugin.PARSER, id=CommonParserType.oh_jsunit_list)
class OHJSUnitTestListParser(IParser):
@ -1227,7 +1338,11 @@ class OHJSUnitTestListParser(IParser):
suite_dict_list = json.loads(self.json_str).get("suites", [])
for suite_dict in suite_dict_list:
for class_name, test_name_dict_list in suite_dict.items():
self.tests_dict.update({class_name.strip(): []})
for test_name_dict in test_name_dict_list:
for test_name in test_name_dict.values():
test = TestDescription(class_name, test_name)
test = TestDescription(class_name.strip(),
test_name.strip())
self.tests_dict.get(
class_name.strip()).append(test)
self.tests.append(test)

View File

@ -87,6 +87,7 @@ class DeviceStateMonitor(object):
try:
result = self.device.get_recover_result(retry=0)
if self.device.check_recover_result(result):
time.sleep(3)
return True
except Exception as exception:
self.device.log.error("wait for boot complete exception: %s"