!265 Update 补丁标签抽出为单独的用例

Merge pull request !265 from zhangjingyu/master
This commit is contained in:
openharmony_ci 2023-12-18 07:29:42 +00:00 committed by Gitee
commit d80d85d11c
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 104 additions and 106 deletions

View File

@ -810,6 +810,7 @@ class OHYaraConfig(Enum):
VULNERABILITIES = "vulnerabilities"
VUL_ID = "vul_id"
OPENHARMONY_SA = "openharmony-sa"
CVE = "cve"
AFFECTED_VERSION = "affected_versions"
MONTH = "month"
SEVERITY = "severity"
@ -933,8 +934,6 @@ class OHYaraTestDriver(IDriver):
message_list = list()
json_config = JsonParser(config_file)
self._get_driver_config(json_config)
# get device info
self.security_patch = self.config.device.execute_shell_command(
"param get const.ohos.version.security_patch").strip()
@ -943,117 +942,116 @@ class OHYaraTestDriver(IDriver):
if "fail" in self.system_version:
self._get_full_name_by_tool_hap()
vul_info_file = get_config_value('vul-info-file', json_config.get_driver(), False)
# Extract patch labels into separate testcase
if vul_info_file == "vul_info_patch_label_test":
vul_items = list()
item = VulItem()
item.vul_id = "Patch-label-test"
item.month = "Patch-label-test"
vul_items = self._get_vul_items()
# if security patch expire, case fail
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired. Set all case blocked, but still execute.")
for _, item in enumerate(vul_items):
# item.complete = True
item.final_risk = OHYaraConfig.BLOCK.value
# security patch verify
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired.")
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
else:
LOG.info("Security patch is shorter than two months.")
item.final_risk = OHYaraConfig.PASS.value
item.complete = True
vul_items.append(item)
else:
LOG.info("Security patch is shorter than two months. Start yara test.")
# if security patch expire, still execute
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if not vul_version and "OpenHarmony" in self.system_version:
vul_version_list = self.system_version.split("-")[-1].split(".")[:2]
vul_version_list.append("0")
vul_version = ".".join(vul_version_list)
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
LOG.debug("Object type: {}".format(item.object_type))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
if self._check_if_intersection(vul_version, item.affected_versions[i]):
has_inter = True
break
if not has_inter:
LOG.debug("Yara rule [{}] affected versions has no intersection "
"in mapping version, skip it. Mapping version is {}, "
"affected versions is {}".format(item.vul_id, vul_version,
item.affected_versions))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, this testcase pass")
continue
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
if item.object_type == "kernel_linux":
img_file = "/data/local/tmp/boot_linux.img"
package_file = self.kernel_packing(affected_file, img_file)
if not package_file:
LOG.error("Execute failed. Not found file named {}, please check the input".format(affected_file))
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Failed to pack the kernel file."
self._get_driver_config(json_config)
vul_items = self._get_vul_items()
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if not vul_version and "OpenHarmony" in self.system_version:
vul_version_list = self.system_version.split("-")[-1].split(".")[:2]
vul_version_list.append("0")
vul_version = ".".join(vul_version_list)
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
LOG.debug("Object type: {}".format(item.object_type))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
if self._check_if_intersection(vul_version, item.affected_versions[i]):
has_inter = True
break
if not has_inter:
LOG.debug("Yara rule [{}] affected versions has no intersection "
"in mapping version, skip it. Mapping version is {}, "
"affected versions is {}".format(item.vul_id, vul_version,
item.affected_versions))
continue
self.config.device.pull_file(package_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(package_file))
else:
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, this testcase pass")
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
if item.object_type == "kernel_linux":
img_file = "/data/local/tmp/boot_linux.img"
package_file = self.kernel_packing(affected_file, img_file)
if not package_file:
LOG.error("Execute failed. Not found file named {}, "
"please check the input".format(affected_file))
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Failed to pack the kernel file."
continue
self.config.device.pull_file(package_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(package_file))
else:
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
item.final_risk = OHYaraConfig.PASS.value
continue
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
if item.object_type == "kernel_linux":
affected_file_processed = self.file_process_kernel(affected_file, local_path)
if not affected_file_processed:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "Kernel file extraction error"
continue
cmd = [self.config.yara_bin, yara_file, affected_file_processed]
else:
cmd = [self.config.yara_bin, yara_file, affected_file]
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, ")
if "testcase pass" in result:
item.trace = "{}{}".format(item.trace, "this testcase pass")
break
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
if item.object_type == "kernel_linux":
affected_file_processed = self.file_process_kernel(affected_file, local_path)
if not affected_file_processed:
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Kernel file extraction error"
continue
cmd = [self.config.yara_bin, yara_file, affected_file_processed]
else:
item.trace = "{}{}".format(item.trace, "this testcase failed")
else:
cmd = [self.config.yara_bin, yara_file, affected_file]
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
if "testcase pass" in result:
item.final_risk = OHYaraConfig.PASS.value
break
else:
if self._check_if_expire_or_risk(item.month, check_risk=True):
item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
item.disclosure.get("zh", ""))
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
item.disclosure.get("zh", ""))
else:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
# if no risk delete files, if rule has risk keep it
if item.final_risk != OHYaraConfig.FAIL.value:
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if os.path.exists(local_path):
LOG.debug(
"Yara rule [{}] has no risk, remove affected files.".format(
item.yara_rules[index]))
shutil.rmtree(local_path)
item.complete = True
# if no risk delete files, if rule has risk keep it
if item.final_risk != OHYaraConfig.FAIL.value:
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if os.path.exists(local_path):
LOG.debug(
"Yara rule [{}] has no risk, remove affected files.".format(
item.yara_rules[index]))
shutil.rmtree(local_path)
item.complete = True
self._generate_yara_report(request, vul_items, message_list)
self._generate_xml_report(request, vul_items, message_list)
@ -1125,12 +1123,12 @@ class OHYaraTestDriver(IDriver):
cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip()
LOG.debug("kernel file detail: {}".format(cmd_result))
if "No such file or directory" in cmd_result:
return None
return False
link_file = cmd_result.split(" ")[-1]
pack_result = self.config.device.execute_shell_command(f"dd if={link_file} of={img_file}")
LOG.debug("kernel package detail: {}".format(pack_result))
if "No such file or directory" in pack_result:
return None
return False
return img_file
def file_process_kernel(self, affected_file, local_path):
@ -1138,9 +1136,9 @@ class OHYaraTestDriver(IDriver):
from vmlinux_to_elf.elf_symbolizer import ElfSymbolizer
from vmlinux_to_elf.architecture_detecter import ArchitectureGuessError
from vmlinux_to_elf.vmlinuz_decompressor import obtain_raw_kernel_from_file
except:
except ImportError:
LOG.error("Please install the tool of vmlinux_to_elf before running.")
return None
return False
# 内核文件解析慢,解析过一次放到公共目录下,该月份下用例共用
dir_path = os.path.dirname(local_path)
@ -1151,9 +1149,9 @@ class OHYaraTestDriver(IDriver):
# 1 解压
try:
exec_cmd("7z")
except:
except NameError:
LOG.error("Please install the command of 7z before running.")
return None
return False
decompress_result = exec_cmd(f"7z x {affected_file} -o{local_path}")
LOG.debug("kernel file decompress detail: {}".format(decompress_result))
# 2 解析
@ -1162,7 +1160,7 @@ class OHYaraTestDriver(IDriver):
output_file = processed_file
if not input_file:
LOG.error("An error occurred when decompressing the kernel file.")
return None
return False
with open(input_file, "rb") as kernel_bin:
try:
ElfSymbolizer(obtain_raw_kernel_from_file(kernel_bin.read()), output_file)
@ -1170,8 +1168,6 @@ class OHYaraTestDriver(IDriver):
LOG.error("An error occurred when pasing the kernel file.")
return None
return output_file
def _get_vul_items(self):
vul_items = list()
@ -1180,7 +1176,7 @@ class OHYaraTestDriver(IDriver):
for _, vul in enumerate(vulnerabilities):
affected_versions = vul.get(OHYaraConfig.AFFECTED_VERSION.value, [])
item = VulItem()
item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.OPENHARMONY_SA.value, "")
item.vul_id = vul.get(OHYaraConfig.VUL_ID.value, dict()).get(OHYaraConfig.CVE.value, "")
item.affected_versions = affected_versions
item.month = vul.get(OHYaraConfig.MONTH.value, "")
item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")

View File

@ -468,6 +468,8 @@ class VisionHelper:
self.summary_element = summary_element
exec_info = self._set_exec_info(report_path, task_info)
suites = self._set_suites_info()
if exec_info.test_type == "SSTS":
suites.sort(key=lambda x: x.module_name, reverse=True)
summary = self._set_summary_info()
return exec_info, summary, suites