!263 Add kernel file process

Merge pull request !263 from zhangjingyu/master
This commit is contained in:
openharmony_ci 2023-12-11 02:47:14 +00:00 committed by Gitee
commit c078e4d469
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

View File

@ -815,6 +815,7 @@ class OHYaraConfig(Enum):
SEVERITY = "severity"
VUL_DESCRIPTION = "vul_description"
DISCLOSURE = "disclosure"
OBJECT_TYPE = "object_type"
AFFECTED_FILES = "affected_files"
YARA_RULES = "yara_rules"
@ -833,6 +834,7 @@ class VulItem:
severity = ""
vul_description = dict()
disclosure = dict()
object_type = ""
affected_files = ""
affected_versions = ""
yara_rules = ""
@ -946,72 +948,112 @@ class OHYaraTestDriver(IDriver):
# if security patch expire, case fail
current_date_str = datetime.now().strftime('%Y-%m')
if self._check_if_expire_or_risk(current_date_str):
LOG.info("Security patch has expired. Set all case fail.")
LOG.info("Security patch has expired. Set all case blocked, but still execute.")
for _, item in enumerate(vul_items):
item.complete = True
item.final_risk = OHYaraConfig.FAIL.value
# item.complete = True
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_001.value)
else:
LOG.info("Security patch is shorter than two months. Start yara test.")
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
if self._check_if_intersection(vul_version, item.affected_versions[i]):
has_inter = True
break
if not has_inter:
LOG.debug("Yara rule [{}] affected versions has no intersection "
"in mapping version, skip it. Mapping version is {}, "
"affected versions is {}".format(item.vul_id, vul_version,
item.affected_versions))
# if security patch expire, still execute
# parse version mapping file
mapping_info = self._do_parse_json(self.config.version_mapping_file)
os_full_name_list = mapping_info.get(OHYaraConfig.OS_FULLNAME_LIST.value, None)
# check if system version in version mapping list
vul_version = os_full_name_list.get(self.system_version, None)
# not in the maintenance scope, skip all case
if not vul_version and "OpenHarmony" in self.system_version:
vul_version_list = self.system_version.split("-")[-1].split(".")[:2]
vul_version_list.append("0")
vul_version = ".".join(vul_version_list)
if vul_version is None:
LOG.debug("The system version is not in the maintenance scope, skip it. "
"system versions is {}".format(self.system_version))
else:
for _, item in enumerate(vul_items):
LOG.debug("Affected files: {}".format(item.affected_files))
LOG.debug("Object type: {}".format(item.object_type))
for index, affected_file in enumerate(item.affected_files):
has_inter = False
for i, _ in enumerate(item.affected_versions):
if self._check_if_intersection(vul_version, item.affected_versions[i]):
has_inter = True
break
if not has_inter:
LOG.debug("Yara rule [{}] affected versions has no intersection "
"in mapping version, skip it. Mapping version is {}, "
"affected versions is {}".format(item.vul_id, vul_version,
item.affected_versions))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, this testcase pass")
continue
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
if item.object_type == "kernel_linux":
img_file = "/data/local/tmp/boot_linux.img"
package_file = self.kernel_packing(affected_file, img_file)
if not package_file:
LOG.error("Execute failed. Not found file named {}, please check the input".format(affected_file))
item.final_risk = OHYaraConfig.FAIL.value
item.trace = "Failed to pack the kernel file."
continue
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if not os.path.exists(local_path):
os.makedirs(local_path)
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
self.config.device.pull_file(package_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(package_file))
else:
self.config.device.pull_file(affected_file, local_path)
affected_file = os.path.join(local_path, os.path.basename(affected_file))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
if not os.path.exists(affected_file):
LOG.debug("affected file [{}] is not exist, skip it.".format(item.affected_files[index]))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, this testcase pass")
else:
item.final_risk = OHYaraConfig.PASS.value
continue
yara_file = get_file_absolute_path(item.yara_rules[index], [self.config.testcases_path])
if item.object_type == "kernel_linux":
affected_file_processed = self.file_process_kernel(affected_file, local_path)
if not affected_file_processed:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "Kernel file extraction error"
continue
cmd = [self.config.yara_bin, yara_file, affected_file_processed]
else:
cmd = [self.config.yara_bin, yara_file, affected_file]
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
result = exec_cmd(cmd)
LOG.debug("Yara result: {}, affected file: {}".format(result, item.affected_files[index]))
if item.final_risk == OHYaraConfig.BLOCK.value:
item.trace = "{}\\n{}".format(item.trace, "if ignore it, ")
if "testcase pass" in result:
item.trace = "{}{}".format(item.trace, "this testcase pass")
break
else:
item.trace = "{}{}".format(item.trace, "this testcase failed")
else:
if "testcase pass" in result:
item.final_risk = OHYaraConfig.PASS.value
break
else:
if self._check_if_expire_or_risk(item.month, check_risk=True):
item.trace = "{}{}".format(OHYaraConfig.ERROR_MSG_003.value,
item.disclosure.get("zh", ""))
item.disclosure.get("zh", ""))
item.final_risk = OHYaraConfig.FAIL.value
else:
item.final_risk = OHYaraConfig.BLOCK.value
item.trace = "{}{}".format(item.trace, OHYaraConfig.ERROR_MSG_002.value)
# if no risk delete files, if rule has risk keep it
if item.final_risk != OHYaraConfig.FAIL.value:
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if os.path.exists(local_path):
LOG.debug(
"Yara rule [{}] has no risk, remove affected files.".format(
item.yara_rules[index]))
shutil.rmtree(local_path)
item.complete = True
# if no risk delete files, if rule has risk keep it
if item.final_risk != OHYaraConfig.FAIL.value:
local_path = os.path.join(request.config.report_path, OHYaraConfig.AFFECTED_FILES.value,
request.get_module_name(), item.yara_rules[index].split('.')[0])
if os.path.exists(local_path):
LOG.debug(
"Yara rule [{}] has no risk, remove affected files.".format(
item.yara_rules[index]))
shutil.rmtree(local_path)
item.complete = True
self._generate_yara_report(request, vul_items, message_list)
self._generate_xml_report(request, vul_items, message_list)
@ -1079,6 +1121,58 @@ class OHYaraTestDriver(IDriver):
_do_check(source_groups[1], dst_groups[0])
return False
def kernel_packing(self, affected_file, img_file):
cmd_result = self.config.device.execute_shell_command(f"ls -al {affected_file}").strip()
LOG.debug("kernel file detail: {}".format(cmd_result))
if "No such file or directory" in cmd_result:
return None
link_file = cmd_result.split(" ")[-1]
pack_result = self.config.device.execute_shell_command(f"dd if={link_file} of={img_file}")
LOG.debug("kernel package detail: {}".format(pack_result))
if "No such file or directory" in pack_result:
return None
return img_file
def file_process_kernel(self, affected_file, local_path):
try:
from vmlinux_to_elf.elf_symbolizer import ElfSymbolizer
from vmlinux_to_elf.architecture_detecter import ArchitectureGuessError
from vmlinux_to_elf.vmlinuz_decompressor import obtain_raw_kernel_from_file
except:
LOG.error("Please install the tool of vmlinux_to_elf before running.")
return None
# 内核文件解析慢,解析过一次放到公共目录下,该月份下用例共用
dir_path = os.path.dirname(local_path)
processed_file = os.path.join(dir_path, "vmlinux.elf")
if os.path.exists(processed_file):
LOG.debug("The kernel file has been extracted, will reuse the previous pasing file.")
return processed_file
# 1 解压
try:
exec_cmd("7z")
except:
LOG.error("Please install the command of 7z before running.")
return None
decompress_result = exec_cmd(f"7z x {affected_file} -o{local_path}")
LOG.debug("kernel file decompress detail: {}".format(decompress_result))
# 2 解析
print("Kernel file extraction will take a few minutes, please wait patiently...")
input_file = os.path.join(local_path, "extlinux", "Image")
output_file = processed_file
if not input_file:
LOG.error("An error occurred when decompressing the kernel file.")
return None
with open(input_file, "rb") as kernel_bin:
try:
ElfSymbolizer(obtain_raw_kernel_from_file(kernel_bin.read()), output_file)
except ArchitectureGuessError:
LOG.error("An error occurred when pasing the kernel file.")
return None
return output_file
def _get_vul_items(self):
vul_items = list()
vul_info = self._do_parse_json(self.config.vul_info_file)
@ -1092,6 +1186,7 @@ class OHYaraTestDriver(IDriver):
item.severity = vul.get(OHYaraConfig.SEVERITY.value, "")
item.vul_description = vul.get(OHYaraConfig.VUL_DESCRIPTION.value, "")
item.disclosure = vul.get(OHYaraConfig.DISCLOSURE.value, "")
item.object_type = vul.get(OHYaraConfig.OBJECT_TYPE.value, "")
item.affected_files = \
vul["affected_device"]["standard"]["linux"]["arm"]["scan_strategy"]["ists"]["yara"].get(
OHYaraConfig.AFFECTED_FILES.value, [])