!229 新增支撑SSTS测试套

Merge pull request !229 from liguangjie/master
This commit is contained in:
openharmony_ci 2023-05-29 04:35:14 +00:00 committed by Gitee
commit 9003690302
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
5 changed files with 525 additions and 34 deletions

View File

@ -20,6 +20,10 @@ import os
import time
import json
import stat
import shutil
import re
from datetime import datetime
from enum import Enum
from xdevice import ConfigConst
from xdevice import ParamError
@ -40,18 +44,52 @@ from xdevice import DeviceTestType
from xdevice import CommonParserType
from xdevice import FilePermission
from xdevice import ResourceManager
from xdevice import get_file_absolute_path
from xdevice import exec_cmd
from ohos.testkit.kit import oh_jsunit_para_parse
from ohos.executor.listener import CollectingPassListener
from ohos.constants import CKit
from ohos.environment.dmlib import process_command_ret
__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver"]
__all__ = ["OHJSUnitTestDriver", "OHKernelTestDriver",
"OHYaraTestDriver", "oh_jsunit_para_parse"]
TIME_OUT = 300 * 1000
LOG = platform_logger("OpenHarmony")
def oh_jsunit_para_parse(runner, junit_paras):
junit_paras = dict(junit_paras)
test_type_list = ["function", "performance", "reliability", "security"]
size_list = ["small", "medium", "large"]
level_list = ["0", "1", "2", "3"]
for para_name in junit_paras.keys():
para_name = para_name.strip()
para_values = junit_paras.get(para_name, [])
if para_name == "class":
runner.add_arg(para_name, ",".join(para_values))
elif para_name == "notClass":
runner.add_arg(para_name, ",".join(para_values))
elif para_name == "testType":
if para_values[0] not in test_type_list:
continue
# function/performance/reliability/security
runner.add_arg(para_name, para_values[0])
elif para_name == "size":
if para_values[0] not in size_list:
continue
# size small/medium/large
runner.add_arg(para_name, para_values[0])
elif para_name == "level":
if para_values[0] not in level_list:
continue
# 0/1/2/3/4
runner.add_arg(para_name, para_values[0])
elif para_name == "stress":
runner.add_arg(para_name, para_values[0])
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_kernel_test)
class OHKernelTestDriver(IDriver):
"""
@ -763,3 +801,389 @@ class OHRustTestDriver(IDriver):
def __result__(self):
return self.result if os.path.exists(self.result) else ""
class OHYaraConfig(Enum):
HAP_FILE = "hap-file"
BUNDLE_NAME = "bundle-name"
RUNNER = "runner"
TESTCASE_CLASS = "class"
OS_FULLNAME_LIST = "osFullNameList"
VULNERABILITIES = "vulnerabilities"
VUL_ID = "vul_id"
OPENHARMONY_SA = "openharmony-sa"
AFFECTED_VERSION = "affected_versions"
MONTH = "month"
SEVERITY = "severity"
VUL_DESCRIPTION = "vul_description"
DISCLOSURE = "disclosure"
AFFECTED_FILES = "affected_files"
YARA_RULES = "yara_rules"
PASS = "pass"
FAIL = "fail"
BLOCK = "block"
ERROR_MSG_001 = "The patch label is longer than two months (60 days), " \
"which violates the OHCA agreement [https://compatibility.openharmony.cn/]."
ERROR_MSG_002 = "This test case is beyond the patch label scope and does not need to be executed."
ERROR_MSG_003 = "Modify the code according to the patch requirements: "
class VulItem:
vul_id = ""
month = ""
severity = ""
vul_description = dict()
disclosure = dict()
affected_files = ""
affected_versions = ""
yara_rules = ""
trace = ""
final_risk = OHYaraConfig.PASS.value
complete = False
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_yara_test)
class OHYaraTestDriver(IDriver):
def __init__(self):
self.result = ""
self.error_message = ""
self.config = None
self.tool_hap_info = dict()
self.security_patch = None
self.system_version = None
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __execute__(self, request):
try:
LOG.debug("Start to execute open harmony yara test")
self.result = os.path.join(
request.config.report_path, "result",
'.'.join((request.get_module_name(), "xml")))
self.config = request.config
self.config.device = request.config.environment.devices[0]
config_file = request.root.source.config_file
suite_file = request.root.source.source_file
if not suite_file:
raise ParamError(
"test source '%s' not exists" %
request.root.source.source_string, error_no="00110")
LOG.debug("Test case file path: %s" % suite_file)
self.config.device.set_device_report_path(request.config.report_path)
self._run_oh_yara(config_file, request)
except Exception as exception:
self.error_message = exception
if not getattr(exception, "error_no", ""):
setattr(exception, "error_no", "03409")
LOG.exception(self.error_message, exc_info=False, error_no="03409")
finally:
serial = "{}_{}".format(str(request.config.device.__get_serial__()),
time.time_ns())
log_tar_file_name = "{}_{}".format(
request.get_module_name(), str(serial).replace(":", "_"))
self.config.device.device_log_collector.stop_hilog_task(
log_tar_file_name)
self.result = check_result_report(
request.config.report_path, self.result, self.error_message)
def _get_driver_config(self, json_config):
yara_bin = get_config_value('yara-bin',
json_config.get_driver(), False)
version_mapping_file = get_config_value('version-mapping-file',
json_config.get_driver(), False)
vul_info_file = get_config_value('vul-info-file',
json_config.get_driver(), False)
# get absolute file path
self.config.yara_bin = get_file_absolute_path(yara_bin)
self.config.version_mapping_file = get_file_absolute_path(version_mapping_file)
self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
# get tool hap info
tool_hap_info = get_config_value('tools-hap-info',
json_config.get_driver(), False)
if tool_hap_info:
self.tool_hap_info[OHYaraConfig.HAP_FILE.value] = \
tool_hap_info.get("hap-file", "")
self.tool_hap_info[OHYaraConfig.BUNDLE_NAME.value] = \
tool_hap_info.get("bundle-name", "")
self.tool_hap_info[OHYaraConfig.RUNNER.value] = \
tool_hap_info.get("runner", "")
self.tool_hap_info[OHYaraConfig.TESTCASE_CLASS.value] = \
tool_hap_info.get("class", "")
def _run_oh_yara(self, config_file, request=None):
message_list = list()
json_config = JsonParser(config_file)
self._get_driver_config(json_config)
# get device info
self.security_patch = self.config.device.execute_shell_command(
"param get const.ohos.version.security_patch").strip()
self.system_version = self.config.device.execute_shell_command(
"param get const.ohos.fullname").strip()
if "fail" in self.system_version:
self._get_full_name_by_tool_hap()
vul_items = self._get_vul_items()
# if security patch expire, case fail
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired. Set all case fail.")
for _, item in enumerate(vul_items):
item.complete = True
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
else:
LOG.info("Security patch is shorter than two months. Start yara test.")
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
if self._check_if_intersection(vul_version, item.affected_versions[i]):
has_inter = True
break
if not has_inter:
LOG.debug("Yara rule [{}] affected versions has no intersection "
"in mapping version, skip it. Mapping version is {}, "
"affected versions is {}".format(item.vul_id, vul_version,
item.affected_versions))
continue
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
item.final_risk = OHYaraConfig.PASS.value
continue
cmd = [self.config.yara_bin, yara_file, affected_file]
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
if "testcase pass" in result:
item.final_risk = OHYaraConfig.PASS.value
break
else:
if self._check_if_expire_or_risk(item.month, check_risk=True):
item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
item.disclosure.get("zh", ""))
item.final_risk = OHYaraConfig.FAIL.value
else:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
# if no risk delete files, if rule has risk keep it
if item.final_risk != OHYaraConfig.FAIL.value:
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if os.path.exists(local_path):
LOG.debug(
"Yara rule [{}] has no risk, remove affected files.".format(
item.yara_rules[index]))
shutil.rmtree(local_path)
item.complete = True
self._generate_yara_report(request, vul_items, message_list)
self._generate_xml_report(request, vul_items, message_list)
def _check_if_expire_or_risk(self, date_str, expire_time=2, check_risk=False):
from dateutil.relativedelta import relativedelta
self.security_patch = self.security_patch.replace(' ', '')
self.security_patch = self.security_patch.replace('/', '-')
# get current date
source_date = datetime.strptime(date_str, '%Y-%m')
security_patch_date = datetime.strptime(self.security_patch[:-3], '%Y-%m')
# check if expire 2 months
rd = relativedelta(source_date, security_patch_date)
months = rd.months + (rd.years * 12)
if check_risk:
# vul time before security patch time no risk
LOG.debug("Security patch time: {}, vul time: {}, delta_months: {}"
.format(self.security_patch[:-3], date_str, months))
if months > 0:
return False
else:
return True
else:
# check if security patch time expire current time 2 months
LOG.debug("Security patch time: {}, current time: {}, delta_months: {}"
.format(self.security_patch[:-3], date_str, months))
if months > expire_time:
return True
else:
return False
@staticmethod
def _check_if_intersection(source_version, dst_version):
# para dst_less_sor control if dst less than source
def _do_check(soruce, dst, dst_less_sor=True):
if re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', soruce) and \
re.match(r'^\d{1,3}.\d{1,3}.\d{1,3}', dst):
source_vers = soruce.split(".")
dst_vers = dst.split(".")
for index, _ in enumerate(source_vers):
if dst_less_sor:
# check if all source number less than dst number
if int(source_vers[index]) < int(dst_vers[index]):
return False
else:
# check if all source number larger than dst number
if int(source_vers[index]) > int(dst_vers[index]):
return False
return True
return False
source_groups = source_version.split("-")
dst_groups = dst_version.split("-")
if source_version == dst_version:
return True
elif len(source_groups) == 1 and len(dst_groups) == 1:
return source_version == dst_version
elif len(source_groups) == 1 and len(dst_groups) == 2:
return _do_check(source_groups[0], dst_groups[0]) and \
_do_check(source_groups[0], dst_groups[1], dst_less_sor=False)
elif len(source_groups) == 2 and len(dst_groups) == 1:
return _do_check(source_groups[0], dst_groups[0], dst_less_sor=False) and \
_do_check(source_groups[1], dst_groups[0])
elif len(source_groups) == 2 and len(dst_groups) == 2:
return _do_check(source_groups[0], dst_groups[1], dst_less_sor=False) and \
_do_check(source_groups[1], dst_groups[0])
return False
def _get_vul_items(self):
vul_items = list()
vul_info = self._do_parse_json(self.config.vul_info_file)
vulnerabilities = vul_info.get(OHYaraConfig.VULNERABILITIES.value, [])
for _, vul in enumerate(vulnerabilities):
affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
item = VulItem()
item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "")
item.affected_versions = affected_versions
item.month = vul.get(OHYaraConfig.MONTH.value, "")
item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
item.affected_files = \
vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
OHYaraConfig.AFFECTED_FILES.value, [])
item.yara_rules = \
vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
OHYaraConfig.YARA_RULES.value, [])
vul_items.append(item)
LOG.debug("Vul size is {}".format(len(vul_items)))
return vul_items
@staticmethod
def _do_parse_json(file_path):
json_content = None
if not os.path.exists(file_path):
raise ParamError("The json file {} does not exist".format(
file_path), error_no="00110")
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(file_path, flags, modes),
"r", encoding="utf-8") as file_content:
json_content = json.load(file_content)
if json_content is None:
raise ParamError("The json file {} parse error".format(
file_path), error_no="00110")
return json_content
def _get_full_name_by_tool_hap(self):
if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value, None) is None:
raise ParamError("The json file not set tool hap.", error_no="00110")
# check if tool hap has installed
result = self.config.device.execute_shell_command(
"bm dump -a | grep {}".format(self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
LOG.debug(result)
if self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value) not in result:
hap_path = get_file_absolute_path(self.tool_hap_info.get(OHYaraConfig.HAP_FILE.value))
self.config.device.push_file(hap_path, "/data/local/tmp")
result = self.config.device.execute_shell_command(
"bm install -p /data/local/tmp/{}".format(os.path.basename(hap_path)))
LOG.debug(result)
self.config.device.execute_shell_command(
"mkdir -p /data/app/el2/100/base/{}/haps/entry/files".format(
self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value)))
self.config.device.execute_shell_command(
"aa test -b {} -m entry -s unittest {} -s type testcase -s class {} -s timeout 5000".format(
self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value),
self.tool_hap_info.get(OHYaraConfig.RUNNER.value),
self.tool_hap_info.get(OHYaraConfig.TESTCASE_CLASS.value)))
self.system_version = self.config.device.execute_shell_command(
"cat /data/app/el2/100/base/{}/haps/entry/files/osFullNameInfo.txt".format(
self.tool_hap_info.get(OHYaraConfig.BUNDLE_NAME.value))).replace('"', '')
LOG.debug(self.system_version)
def _generate_yara_report(self, request, vul_items, result_message):
import csv
result_message.clear()
yara_report = os.path.join(request.config.report_path, "vul_info_{}.csv"
.format(request.config.device.device_sn))
if os.path.exists(yara_report):
data = []
else:
data = [
["设备版本号:", self.system_version, "设备安全补丁标签:", self.security_patch],
["漏洞编号", "严重程度", "披露时间", "检测结果", "修复建议", "漏洞描述"]
]
fd = os.open(yara_report, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o755)
for _, item in enumerate(vul_items):
data.append([item.vul_id, item.severity,
item.month, item.final_risk,
item.disclosure.get("zh", ""), item.vul_description.get("zh", "")])
result = "{}|{}|{}|{}|{}|{}|{}\n".format(
item.vul_id, item.severity,
item.month, item.final_risk,
item.disclosure.get("zh", ""), item.vul_description.get("zh", ""),
item.trace)
result_message.append(result)
with os.fdopen(fd, "a", newline='') as file_handler:
writer = csv.writer(file_handler)
writer.writerows(data)
def _generate_xml_report(self, request, vul_items, message_list):
result_message = "".join(message_list)
listener_copy = request.listeners.copy()
parsers = get_plugin(
Plugin.PARSER, CommonParserType.oh_yara)
if parsers:
parsers = parsers[:1]
for listener in listener_copy:
listener.device_sn = self.config.device.device_sn
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = request.get_module_name()
parser_instance.vul_items = vul_items
parser_instance.listeners = listener_copy
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
process_command_ret(result_message, handler)
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -192,6 +192,7 @@ class HdcMonitor:
self.main_hdc_connection)
self.list_targets()
time.sleep(2)
except (HdcError, Exception) as _:
self.handle_exception_monitor_loop()
break

View File

@ -1500,3 +1500,98 @@ class OHRustTestParser(IParser):
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite, suite_report=True)
@Plugin(type=Plugin.PARSER, id=CommonParserType.oh_yara)
class OHYaraTestParser(IParser):
last_line = ""
pattern = r"(\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) "
def __init__(self):
self.state_machine = StateRecorder()
self.suites_name = ""
self.vul_items = None
self.listeners = []
def get_listeners(self):
return self.listeners
def __process__(self, lines):
self.parse(lines)
def __done__(self):
pass
def parse(self, lines):
for line in lines:
if line:
self.handle_suites_started_tag()
self.handle_suite_started_tag()
self.handle_one_test_tag(line)
self.handle_suite_ended_tag()
self.handle_suites_ended_tag()
def handle_suites_started_tag(self):
self.state_machine.get_suites(reset=True)
test_suites = self.state_machine.get_suites()
test_suites.suites_name = self.suites_name
test_suites.test_num = len(self.vul_items)
for listener in self.get_listeners():
suite_report = copy.copy(test_suites)
listener.__started__(LifeCycle.TestSuites, suite_report)
def handle_suites_ended_tag(self):
suites = self.state_machine.get_suites()
suites.is_completed = True
for listener in self.get_listeners():
listener.__ended__(LifeCycle.TestSuites, test_result=suites,
suites_name=suites.suites_name)
def handle_one_test_tag(self, message):
status_dict = {"pass": ResultCode.PASSED, "fail": ResultCode.FAILED,
"block": ResultCode.BLOCKED}
message = message.strip().split("|")
test_name = message[0]
status = status_dict.get(message[3])
trace = message[6] if message[3] else ""
run_time = 0
test_suite = self.state_machine.suite()
test_result = self.state_machine.test(reset=True)
test_result.test_class = test_suite.suite_name
test_result.test_name = test_name
test_result.run_time = run_time
test_result.code = status.value
test_result.stacktrace = trace
test_result.current = self.state_machine.running_test_index + 1
self.state_machine.suite().run_time += run_time
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__started__(LifeCycle.TestCase, test_result)
test_suites = self.state_machine.get_suites()
self.state_machine.test().is_completed = True
test_suites.test_num += 1
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__ended__(LifeCycle.TestCase, result)
self.state_machine.running_test_index += 1
def handle_suite_started_tag(self):
self.state_machine.suite(reset=True)
self.state_machine.running_test_index = 0
test_suite = self.state_machine.suite()
test_suite.suite_name = self.suites_name
test_suite.test_num = 1
for listener in self.get_listeners():
suite_report = copy.copy(test_suite)
listener.__started__(LifeCycle.TestSuite, suite_report)
def handle_suite_ended_tag(self):
suite_result = self.state_machine.suite()
suites = self.state_machine.get_suites()
suite_result.run_time = suite_result.run_time
suites.run_time += suite_result.run_time
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)

View File

@ -53,7 +53,7 @@ from ohos.environment.dmlib import CollectingOutputReceiver
__all__ = ["STSKit", "CommandKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
"ConfigKit", "AppInstallKit", "ComponentKit", "PermissionKit",
"junit_dex_para_parse", "oh_jsunit_para_parse", "SmartPerfKit"]
"junit_dex_para_parse", "SmartPerfKit"]
MAX_WAIT_COUNT = 4
TARGET_SDK_VERSION = 22
@ -1052,37 +1052,6 @@ def get_app_name(hap_app):
return app_name
def oh_jsunit_para_parse(runner, junit_paras):
junit_paras = dict(junit_paras)
test_type_list = ["function", "performance", "reliability", "security"]
size_list = ["small", "medium", "large"]
level_list = ["0", "1", "2", "3"]
for para_name in junit_paras.keys():
para_name = para_name.strip()
para_values = junit_paras.get(para_name, [])
if para_name == "class":
runner.add_arg(para_name, ",".join(para_values))
elif para_name == "notClass":
runner.add_arg(para_name, ",".join(para_values))
elif para_name == "testType":
if para_values[0] not in test_type_list:
continue
# function/performance/reliability/security
runner.add_arg(para_name, para_values[0])
elif para_name == "size":
if para_values[0] not in size_list:
continue
# size small/medium/large
runner.add_arg(para_name, para_values[0])
elif para_name == "level":
if para_values[0] not in level_list:
continue
# 0/1/2/3/4
runner.add_arg(para_name, para_values[0])
elif para_name == "stress":
runner.add_arg(para_name, para_values[0])
@Plugin(type=Plugin.TEST_KIT, id=CKit.smartperf)
class SmartPerfKit(ITestKit):
def __init__(self):

View File

@ -124,6 +124,7 @@ class DeviceTestType(object):
oh_jsunit_test = "OHJSUnitTest"
hm_os_jsunit_test = "HMOSJSUnitTest"
oh_rust_test = "OHRustTest"
oh_yara_test = "OHYaraTest"
@dataclass
@ -198,6 +199,7 @@ class CommonParserType:
oh_jsunit = "OHJSUnit"
oh_jsunit_list = "OHJSUnitList"
oh_rust = "OHRust"
oh_yara = "OHYara"
@dataclass