This commit is contained in:
deveco_xdevice 2023-12-22 11:01:35 +08:00
commit f0c0d3eec5
3 changed files with 112 additions and 17 deletions

View File

@ -810,11 +810,13 @@ class OHYaraConfig(Enum):
VULNERABILITIES = "vulnerabilities"
VUL_ID = "vul_id"
OPENHARMONY_SA = "openharmony-sa"
CVE = "cve"
AFFECTED_VERSION = "affected_versions"
MONTH = "month"
SEVERITY = "severity"
VUL_DESCRIPTION = "vul_description"
DISCLOSURE = "disclosure"
OBJECT_TYPE = "object_type"
AFFECTED_FILES = "affected_files"
YARA_RULES = "yara_rules"
@ -833,6 +835,7 @@ class VulItem:
severity = ""
vul_description = dict()
disclosure = dict()
object_type = ""
affected_files = ""
affected_versions = ""
yara_rules = ""
@ -908,7 +911,8 @@ class OHYaraTestDriver(IDriver):
# get absolute file path
self.config.yara_bin = get_file_absolute_path(yara_bin)
self.config.version_mapping_file = get_file_absolute_path(version_mapping_file)
self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
if vul_info_file != "vul_info_patch_label_test":
self.config.vul_info_file = get_file_absolute_path(vul_info_file, [self.config.testcases_path])
# get tool hap info
# default value
@ -932,7 +936,6 @@ class OHYaraTestDriver(IDriver):
json_config = JsonParser(config_file)
self._get_driver_config(json_config)
# get device info
self.security_patch = self.config.device.execute_shell_command(
"param get const.ohos.version.security_patch").strip()
@ -941,30 +944,47 @@ class OHYaraTestDriver(IDriver):
if "fail" in self.system_version:
self._get_full_name_by_tool_hap()
vul_info_file = get_config_value('vul-info-file', json_config.get_driver(), False)
# Extract patch labels into separate testcase
if vul_info_file == "vul_info_patch_label_test":
vul_items = list()
item = VulItem()
item.vul_id = "Patch-label-test"
item.month = "Patch-label-test"
vul_items = self._get_vul_items()
# if security patch expire, case fail
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired. Set all case fail.")
for _, item in enumerate(vul_items):
item.complete = True
# security patch verify
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired.")
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
else:
LOG.info("Security patch is shorter than two months.")
item.final_risk = OHYaraConfig.PASS.value
item.complete = True
vul_items.append(item)
else:
LOG.info("Security patch is shorter than two months. Start yara test.")
vul_items = self._get_vul_items()
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if not vul_version and "OpenHarmony" in self.system_version:
vul_version_list = self.system_version.split("-")[-1].split(".")[:2]
vul_version_list.append("0")
vul_version = ".".join(vul_version_list)
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
LOG.debug("Object type: {}".format(item.object_type))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
@ -981,14 +1001,35 @@ class OHYaraTestDriver(IDriver):
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if item.object_type == "kernel_linux":
img_file = "/data/local/tmp/boot_linux.img"
package_file = self.kernel_packing(affected_file, img_file)
if not package_file:
LOG.error("Execute failed. Not found file named {}, "
"please check the input".format(affected_file))
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Failed to pack the kernel file."
continue
self.config.device.pull_file(package_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(package_file))
else:
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
item.final_risk = OHYaraConfig.PASS.value
continue
cmd = [self.config.yara_bin, yara_file, affected_file]
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
if item.object_type == "kernel_linux":
affected_file_processed = self.file_process_kernel(affected_file, local_path)
if not affected_file_processed:
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Kernel file extraction error"
continue
cmd = [self.config.yara_bin, yara_file, affected_file_processed]
else:
cmd = [self.config.yara_bin, yara_file, affected_file]
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
if "testcase pass" in result:
@ -996,9 +1037,9 @@ class OHYaraTestDriver(IDriver):
break
else:
if self._check_if_expire_or_risk(item.month, check_risk=True):
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
item.disclosure.get("zh", ""))
item.final_risk = OHYaraConfig.FAIL.value
else:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
@ -1079,6 +1120,56 @@ class OHYaraTestDriver(IDriver):
_do_check(source_groups[1], dst_groups[0])
return False
def kernel_packing(self, affected_file, img_file):
cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip()
LOG.debug("kernel file detail: {}".format(cmd_result))
if "No such file or directory" in cmd_result:
return False
link_file = cmd_result.split(" ")[-1]
pack_result = self.config.device.execute_shell_command(f"dd if={link_file} of={img_file}")
LOG.debug("kernel package detail: {}".format(pack_result))
if "No such file or directory" in pack_result:
return False
return img_file
def file_process_kernel(self, affected_file, local_path):
try:
from vmlinux_to_elf.elf_symbolizer import ElfSymbolizer
from vmlinux_to_elf.architecture_detecter import ArchitectureGuessError
from vmlinux_to_elf.vmlinuz_decompressor import obtain_raw_kernel_from_file
except ImportError:
LOG.error("Please install the tool of vmlinux_to_elf before running.")
return False
# 内核文件解析慢,解析过一次放到公共目录下,该月份下用例共用
dir_path = os.path.dirname(local_path)
processed_file = os.path.join(dir_path, "vmlinux.elf")
if os.path.exists(processed_file):
LOG.debug("The kernel file has been extracted, will reuse the previous pasing file.")
return processed_file
# 1 解压
try:
exec_cmd("7z")
except NameError:
LOG.error("Please install the command of 7z before running.")
return False
decompress_result = exec_cmd(f"7z x {affected_file} -o{local_path}")
LOG.debug("kernel file decompress detail: {}".format(decompress_result))
# 2 解析
print("Kernel file extraction will take a few minutes, please wait patiently...")
input_file = os.path.join(local_path, "extlinux", "Image")
output_file = processed_file
if not input_file:
LOG.error("An error occurred when decompressing the kernel file.")
return False
with open(input_file, "rb") as kernel_bin:
try:
ElfSymbolizer(obtain_raw_kernel_from_file(kernel_bin.read()), output_file)
except ArchitectureGuessError:
LOG.error("An error occurred when pasing the kernel file.")
return None
return output_file
def _get_vul_items(self):
vul_items = list()
vul_info = self._do_parse_json(self.config.vul_info_file)
@ -1086,12 +1177,13 @@ class OHYaraTestDriver(IDriver):
for _, vul in enumerate(vulnerabilities):
affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
item = VulItem()
item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "")
item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.CVE.value, "")
item.affected_versions = affected_versions
item.month = vul.get(OHYaraConfig.MONTH.value, "")
item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
item.object_type = vul.get(OHYaraConfig.OBJECT_TYPE.value, "")
item.affected_files = \
vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
OHYaraConfig.AFFECTED_FILES.value, [])

View File

@ -503,7 +503,8 @@ class ShellKit(ITestKit):
LOG.info("No teardown-localcommand to run, skipping!")
else:
for command in self.tear_down_local_command:
subprocess.run(command)
ret = subprocess.run(command, capture_output=True, text=True)
LOG.info("Teardown-localcommand run: {}".format(ret))
@Plugin(type=Plugin.TEST_KIT, id=CKit.wifi)

View File

@ -468,6 +468,8 @@ class VisionHelper:
self.summary_element = summary_element
exec_info = self._set_exec_info(report_path, task_info)
suites = self._set_suites_info()
if exec_info.test_type == "SSTS":
suites.sort(key=lambda x: x.module_name, reverse=True)
summary = self._set_summary_info()
return exec_info, summary, suites