From 77f15d35dc1d2eca8fa594a1b00dd81447ec7986 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 17 Aug 2023 15:26:22 +0800 Subject: [PATCH 1/2] codex Signed-off-by: unknown --- build_pkcs7.py | 4 +- build_update.py | 177 +++++++++++++++----------------------- create_signed_data.py | 4 +- create_update_package.py | 14 +-- unpack_updater_package.py | 2 +- update_package.py | 95 ++++++++++---------- utils.py | 36 ++++---- vendor_script.py | 1 - 8 files changed, 143 insertions(+), 190 deletions(-) diff --git a/build_pkcs7.py b/build_pkcs7.py index bf861b1..d70b70d 100644 --- a/build_pkcs7.py +++ b/build_pkcs7.py @@ -33,7 +33,7 @@ operation_path = os.path.dirname(os.path.realpath(__file__)) CERT_PATH = os.path.join(operation_path, 'sign_cert/signing_cert.crt') BLOCK_SIZE = 8192 FOOTER_LENGTH = 6 -ZIP_ECOD_LENGTH = 22 +ZIP_EOCD_LENGTH = 22 DIGEST_SHA256 = 672 SHA256_HASH_LEN = 32 @@ -60,7 +60,7 @@ def calculate_package_hash(package_path): hash_sha256 = hashlib.sha256() length = 0 - remain_len = os.path.getsize(package_path) - ZIP_ECOD_LENGTH + remain_len = os.path.getsize(package_path) - ZIP_EOCD_LENGTH with open(package_path, 'rb') as package_file: while remain_len > BLOCK_SIZE: hash_sha256.update(package_file.read(BLOCK_SIZE)) diff --git a/build_update.py b/build_update.py index c0dc9ca..3271042 100644 --- a/build_update.py +++ b/build_update.py @@ -306,29 +306,21 @@ def check_incremental_args(no_zip, partition_file, source_package, :return: """ if "boot" in incremental_img_list: - UPDATE_LOGGER.print_log( - "boot cannot be incrementally processed!", - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("boot cannot be incrementally processed!", UPDATE_LOGGER.ERROR_LOG) clear_resource(err_clear=True) return False if source_package is None: - UPDATE_LOGGER.print_log( - "The source package is missing, " - "cannot be incrementally processed!", - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("The source package is missing, " + "cannot be incrementally processed!", UPDATE_LOGGER.ERROR_LOG) clear_resource(err_clear=True) return False if no_zip: - UPDATE_LOGGER.print_log( - "No ZIP mode, cannot be incrementally processed!", - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("No ZIP mode, cannot be incrementally processed!", UPDATE_LOGGER.ERROR_LOG) clear_resource(err_clear=True) return False if partition_file is not None: - UPDATE_LOGGER.print_log( - "Partition file is not None, " - "cannot be incrementally processed!", - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Partition file is not None, " + "cannot be incrementally processed!", UPDATE_LOGGER.ERROR_LOG) clear_resource(err_clear=True) return False @@ -336,8 +328,7 @@ def check_incremental_args(no_zip, partition_file, source_package, return False xml_path = '' if OPTIONS_MANAGER.source_package_dir is not False: - xml_path = os.path.join(OPTIONS_MANAGER.source_package_dir, - UPDATER_CONFIG, XML_FILE_PATH) + xml_path = os.path.join(OPTIONS_MANAGER.source_package_dir, UPDATER_CONFIG, XML_FILE_PATH) if OPTIONS_MANAGER.source_package_dir is False: OPTIONS_MANAGER.source_package_temp_obj = None OPTIONS_MANAGER.source_package_dir = None @@ -571,7 +562,11 @@ def increment_image_diff_processing( '-p', patch_file_obj.name, '-l', '4096']) sub_p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - output, _ = sub_p.communicate() + try: + output, _ = sub_p.communicate(timeout=1800) + except subprocess.TimeoutExpired: + sub_p.kill() + sub_p.wait() if sub_p.returncode != 0: raise ValueError(output) @@ -579,6 +574,22 @@ def increment_image_diff_processing( script_check_cmd_list, script_write_cmd_list, verse_script) +def add_incremental_command(verse_script, script_check_cmd_list, script_write_cmd_list): + """ + add command for increment_image_progressing to verse_script + :param verse_script: verse script + :param script_check_cmd_list: verse_script check command list + :param script_write_cmd_list: verse_script write command list + :return: + """ + verse_script.add_command("\n# ---- start incremental check here ----\n") + for each_check_cmd in script_check_cmd_list: + verse_script.add_command(each_check_cmd) + verse_script.add_command("\n# ---- start incremental write here ----\n") + for each_write_cmd in script_write_cmd_list: + verse_script.add_command(each_write_cmd) + + def increment_image_processing( verse_script, incremental_img_list, source_package_dir, target_package_dir): @@ -593,30 +604,19 @@ def increment_image_processing( script_check_cmd_list = [] script_write_cmd_list = [] patch_process = None - block_diff = 0 for each_img_name in OPTIONS_MANAGER.incremental_img_name_list: each_img = each_img_name[:-4] - each_src_image_path = \ - os.path.join(source_package_dir, - '%s.img' % each_img) - each_src_map_path = \ - os.path.join(source_package_dir, - '%s.map' % each_img) - each_tgt_image_path = \ - os.path.join(target_package_dir, - '%s.img' % each_img) - each_tgt_map_path = \ - os.path.join(target_package_dir, - '%s.map' % each_img) + each_src_image_path = os.path.join(source_package_dir, '%s.img' % each_img) + each_src_map_path = os.path.join(source_package_dir, '%s.map' % each_img) + each_tgt_image_path = os.path.join(target_package_dir, '%s.img' % each_img) + each_tgt_map_path = os.path.join(target_package_dir, '%s.map' % each_img) check_make_map_path(each_img) if filecmp.cmp(each_src_image_path, each_tgt_image_path): UPDATE_LOGGER.print_log( - "Source Image is the same as Target Image!" - "src image path: %s, tgt image path: %s" % - (each_src_image_path, each_tgt_image_path), - UPDATE_LOGGER.INFO_LOG) + "Source Image is the same as Target Image! src image path: %s, tgt image path: %s" + % (each_src_image_path, each_tgt_image_path), UPDATE_LOGGER.INFO_LOG) OPTIONS_MANAGER.incremental_img_list.remove(each_img) first_block_check_cmd = verse_script.first_block_check(each_img) abort_cmd = verse_script.abort(each_img) @@ -630,16 +630,10 @@ def increment_image_processing( if not os.path.exists(each_src_map_path): src_generate_map = generate_image_map_file(each_src_image_path, each_src_map_path, each_img) - if not src_generate_map: - UPDATE_LOGGER.print_log("The source %s.img file" - "generate map file failed. " % each_img) if not os.path.exists(each_tgt_map_path): tgt_generate_map = generate_image_map_file(each_tgt_image_path, each_tgt_map_path, each_img) - if not tgt_generate_map: - UPDATE_LOGGER.print_log("The target %s.img file" - "generate map file failed. " % each_img) if not src_generate_map or not tgt_generate_map: if increment_image_diff_processing(each_img, each_src_image_path, each_tgt_image_path, @@ -649,11 +643,8 @@ def increment_image_processing( clear_resource(err_clear=True) return False - block_diff += 1 - src_image_class = \ - IncUpdateImage(each_src_image_path, each_src_map_path) - tgt_image_class = \ - IncUpdateImage(each_tgt_image_path, each_tgt_map_path) + src_image_class = IncUpdateImage(each_src_image_path, each_src_map_path) + tgt_image_class = IncUpdateImage(each_tgt_image_path, each_tgt_map_path) OPTIONS_MANAGER.src_image = src_image_class OPTIONS_MANAGER.tgt_image = tgt_image_class @@ -661,36 +652,23 @@ def increment_image_processing( if inc_image: src_image_class, tgt_image_class = inc_image() - transfers_manager = TransfersManager( - each_img, tgt_image_class, src_image_class) + transfers_manager = TransfersManager(each_img, tgt_image_class, src_image_class) transfers_manager.find_process_needs() actions_list = transfers_manager.get_action_list() - graph_process = GigraphProcess(actions_list, src_image_class, - tgt_image_class) + graph_process = GigraphProcess(actions_list, src_image_class, tgt_image_class) actions_list = graph_process.actions_list patch_process = \ - patch_package_process.PatchProcess( - each_img, tgt_image_class, src_image_class, actions_list) + patch_package_process.PatchProcess(each_img, tgt_image_class, src_image_class, actions_list) patch_process.patch_process() - patch_process.write_script(each_img, script_check_cmd_list, - script_write_cmd_list, verse_script) + patch_process.write_script(each_img, script_check_cmd_list, script_write_cmd_list, verse_script) OPTIONS_MANAGER.incremental_block_file_obj_dict[each_img] = patch_process.package_patch_zip if not check_patch_file(patch_process): - UPDATE_LOGGER.print_log( - 'Verify the incremental result failed!', - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log('Verify the incremental result failed!', UPDATE_LOGGER.ERROR_LOG) raise RuntimeError - verse_script.add_command( - "\n# ---- start incremental check here ----\n") - for each_check_cmd in script_check_cmd_list: - verse_script.add_command(each_check_cmd) - verse_script.add_command( - "\n# ---- start incremental write here ----\n") - for each_write_cmd in script_write_cmd_list: - verse_script.add_command(each_write_cmd) + add_incremental_command(verse_script, script_check_cmd_list, script_write_cmd_list) return True @@ -791,6 +769,18 @@ def check_args(private_key, source_package, target_package, update_package): return True +def unpack_package_processing(): + if OPTIONS_MANAGER.unpack_package_path: + package = UnpackPackage() + if not package.unpack_package(): + UPDATE_LOGGER.print_log("Unpack update package.bin failed!", UPDATE_LOGGER.ERROR_LOG) + clear_resource(err_clear=True) + sys.exit(1) + UPDATE_LOGGER.print_log("Unpack update package.bin success!") + clear_resource() + sys.exit(0) + + create_entrance_args() @@ -803,34 +793,18 @@ def main(): OPTIONS_MANAGER.product = PRODUCT source_package, target_package, update_package, no_zip, not_l2, \ - partition_file, signing_algorithm, hash_algorithm, private_key = \ - get_args() + partition_file, signing_algorithm, hash_algorithm, private_key = get_args() if not_l2: no_zip = True # Unpack updater package - if OPTIONS_MANAGER.unpack_package_path: - package = UnpackPackage() - if not package.unpack_package(): - UPDATE_LOGGER.print_log( - "Unpack update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) - clear_resource(err_clear=True) - return - UPDATE_LOGGER.print_log("Unpack update package .bin success!") - clear_resource() - return + unpack_package_processing() if OPTIONS_MANAGER.sd_card: - if source_package is not None or \ - OPTIONS_MANAGER.xml_path is not None or \ - partition_file is not None: - UPDATE_LOGGER.print_log( - "SD Card updater, " - "the -S/-xp/-pf parameter is not required!", - UPDATE_LOGGER.ERROR_LOG) + if source_package is not None or OPTIONS_MANAGER.xml_path is not None or partition_file is not None: + UPDATE_LOGGER.print_log("SDCard updater:-S/-xp/-pf parameter is not required!", UPDATE_LOGGER.ERROR_LOG) raise RuntimeError - if check_args(private_key, source_package, - target_package, update_package) is False: + if check_args(private_key, source_package, target_package, update_package) is False: clear_resource(err_clear=True) return @@ -840,8 +814,7 @@ def main(): return # Create a Script object. - prelude_script, verse_script, refrain_script, ending_script = \ - get_script_obj() + prelude_script, verse_script, refrain_script, ending_script = get_script_obj() # Create partition. if partition_file is not None: @@ -849,8 +822,7 @@ def main(): updater_partitions_cmd = verse_script.updater_partitions() verse_script.add_command(updater_partitions_cmd) - partition_file_obj, partitions_list, partitions_file_path_list = \ - parse_partition_file_xml(partition_file) + partition_file_obj, partitions_list, partitions_file_path_list = parse_partition_file_xml(partition_file) if partition_file_obj is False: clear_resource(err_clear=True) return False @@ -858,35 +830,26 @@ def main(): OPTIONS_MANAGER.full_img_list = partitions_list OPTIONS_MANAGER.full_image_path_list = partitions_file_path_list - if incremental_processing( - no_zip, partition_file, source_package, verse_script) is False: + if incremental_processing(no_zip, partition_file, source_package, verse_script) is False: clear_resource(err_clear=True) return # Full processing if len(OPTIONS_MANAGER.full_img_list) != 0: verse_script.add_command("\n# ---- full image ----\n") - full_update_image = \ - FullUpdateImage(OPTIONS_MANAGER.target_package_dir, - OPTIONS_MANAGER.full_img_list, - OPTIONS_MANAGER.full_img_name_list, - verse_script, OPTIONS_MANAGER.full_image_path_list, - no_zip=OPTIONS_MANAGER.no_zip) - full_image_content_len_list, full_image_file_obj_list = \ - full_update_image.update_full_image() - if full_image_content_len_list is False or \ - full_image_file_obj_list is False: + full_update_image = FullUpdateImage(OPTIONS_MANAGER.target_package_dir, + OPTIONS_MANAGER.full_img_list, OPTIONS_MANAGER.full_img_name_list, verse_script, + OPTIONS_MANAGER.full_image_path_list, no_zip=OPTIONS_MANAGER.no_zip) + full_image_content_len_list, full_image_file_obj_list = full_update_image.update_full_image() + if full_image_content_len_list is False or full_image_file_obj_list is False: clear_resource(err_clear=True) return - OPTIONS_MANAGER.full_image_content_len_list, \ - OPTIONS_MANAGER.full_image_file_obj_list = \ + OPTIONS_MANAGER.full_image_content_len_list, OPTIONS_MANAGER.full_image_file_obj_list = \ full_image_content_len_list, full_image_file_obj_list # Generate the update package. - build_re = build_update_package(no_zip, update_package, - prelude_script, verse_script, - refrain_script, ending_script) - if build_re is False: + if build_update_package( + no_zip, update_package, prelude_script, verse_script, refrain_script, ending_script) is False: clear_resource(err_clear=True) return # Clear resources. diff --git a/create_signed_data.py b/create_signed_data.py index 122996f..8ebe0fc 100644 --- a/create_signed_data.py +++ b/create_signed_data.py @@ -30,8 +30,10 @@ def sign_func_sha256(sign_file, private_key_file): """ hash_sha256 = hashlib.sha256() with open(sign_file, 'rb') as file: - while chunk := file.read(BLOCK_SIZE): + chunk = file.read(BLOCK_SIZE) + while chunk: hash_sha256.update(chunk) + chunk = file.read(BLOCK_SIZE) signature = sign_digest(hash_sha256.digest(), private_key_file) if signature == False: UPDATE_LOGGER.print_log("sign digest failed", log_type=UPDATE_LOGGER.ERROR_LOG) diff --git a/create_update_package.py b/create_update_package.py index 50dcb93..50887a8 100644 --- a/create_update_package.py +++ b/create_update_package.py @@ -319,8 +319,7 @@ class CreatePackage(object): with os.fdopen(package_fd, "wb+") as package_file: # Add information to package if not self.write_pkginfo(package_file): - UPDATE_LOGGER.print_log( - "Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False # Add component to package self.compinfo_offset = UPGRADE_FILE_HEADER_LEN @@ -328,8 +327,7 @@ class CreatePackage(object): self.head_list.entry_count * self.upgrade_compinfo_size + \ UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN for i in range(0, self.head_list.entry_count): - UPDATE_LOGGER.print_log("Add component %s" - % self.component_list[i].component_addr) + UPDATE_LOGGER.print_log("Add component %s" % self.component_list[i].component_addr) if not self.write_component_info(self.component_list[i], package_file): UPDATE_LOGGER.print_log("write component info failed: %s" % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) @@ -345,10 +343,8 @@ class CreatePackage(object): package_file.write( (self.head_list.describe_package_id.decode().ljust(UPGRADE_RESERVE_LEN, "\0")).encode()) except IOError: - UPDATE_LOGGER.print_log( - "Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False - self.hash_info_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN if OPTIONS_MANAGER.sd_card: try: @@ -359,8 +355,7 @@ class CreatePackage(object): self.hash_info_offset += len(hash_check_data.hashinfo_value + hash_check_data.hashdata) except IOError: - UPDATE_LOGGER.print_log( - "Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False self.sign_header(SIGN_ALGO_RSA, hash_check_data, package_file) self.component_offset = self.hash_info_offset @@ -369,6 +364,5 @@ class CreatePackage(object): UPDATE_LOGGER.print_log("write component failed: %s" % self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG) return False - UPDATE_LOGGER.print_log("Write update package complete") return True \ No newline at end of file diff --git a/unpack_updater_package.py b/unpack_updater_package.py index 52055e9..3b59156 100644 --- a/unpack_updater_package.py +++ b/unpack_updater_package.py @@ -128,7 +128,7 @@ class UnpackPackage(object): def create_image_file(self, package_file): component_name, component_type, component_size = \ self.parse_component(package_file) - if not component_name and not component_type and not component_size: + if component_name is None and component_type is None and component_size is None: UPDATE_LOGGER.print_log( "get component_info failed!", UPDATE_LOGGER.ERROR_LOG) return False diff --git a/update_package.py b/update_package.py index 9716f08..4476848 100644 --- a/update_package.py +++ b/update_package.py @@ -97,8 +97,7 @@ def create_update_bin(): :return update_bin_obj: Update file object. If exception occurs, return False. """ - update_bin_obj = tempfile.NamedTemporaryFile( - dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") + update_bin_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="update_bin-") head_value_list = OPTIONS_MANAGER.head_info_list component_dict = OPTIONS_MANAGER.component_info_dict @@ -115,8 +114,7 @@ def create_update_bin(): all_image_name = full_img_list sort_component_dict = collect.OrderedDict() for each_image_name in all_image_name: - sort_component_dict[each_image_name] = \ - component_dict.get(each_image_name) + sort_component_dict[each_image_name] = component_dict.get(each_image_name) component_dict = copy.deepcopy(sort_component_dict) head_list = get_head_list(len(component_dict), head_value_list) @@ -135,15 +133,12 @@ def create_update_bin(): sign_algo = SIGN_ALGO_RSA # create bin - package = CreatePackage(head_list, component_list, save_patch, - OPTIONS_MANAGER.private_key) + package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key) if not package.create_package(): - UPDATE_LOGGER.print_log( - "Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG) return False - UPDATE_LOGGER.print_log( - "Create update package .bin complete! path: %s" % update_bin_obj.name) + UPDATE_LOGGER.print_log("Create update package .bin complete! path: %s" % update_bin_obj.name) return update_bin_obj @@ -160,8 +155,7 @@ def get_component_list(all_image_file_obj_list, component_dict): component_list = pkg_components() if not OPTIONS_MANAGER.not_l2: if OPTIONS_MANAGER.partition_file_obj is not None: - extend_component_list = \ - EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + extend_component_list = EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path, OPTIONS_MANAGER.board_list_file_path, OPTIONS_MANAGER.partition_file_obj.name] @@ -177,8 +171,7 @@ def get_component_list(all_image_file_obj_list, component_dict): if idx < len(extend_component_list): file_path = extend_path_list[idx] else: - file_path = \ - all_image_file_obj_list[idx - len(extend_component_list)].name + file_path = all_image_file_obj_list[idx - len(extend_component_list)].name digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm) if not digest: return @@ -189,11 +182,9 @@ def get_component_list(all_image_file_obj_list, component_dict): binascii.a2b_hex(digest.encode('utf-8'))) component_list[idx].file_path = file_path.encode("utf-8") if not OPTIONS_MANAGER.not_l2: - component_list[idx].component_addr = \ - ('/%s' % component[0]).encode("utf-8") + component_list[idx].component_addr = ('/%s' % component[0]).encode("utf-8") else: - component_list[idx].component_addr = \ - ('%s' % component[0]).encode("utf-8") + component_list[idx].component_addr = ('%s' % component[0]).encode("utf-8") component_list[idx].version = component[4].encode("utf-8") component_list[idx].size = os.path.getsize(file_path) component_list[idx].id = int(component[1]) @@ -339,16 +330,14 @@ def create_build_tools_zip(): get_tools_component_list(count, opera_script_dict) total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj register_script_file_obj = OPTIONS_MANAGER.register_script_file_obj - update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, - UPDATE_EXE_FILE_NAME) + update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir, UPDATE_EXE_FILE_NAME) if not os.path.exists(update_exe_path): UPDATE_LOGGER.print_log( "updater_binary file does not exist!path: %s" % update_exe_path, log_type=UPDATE_LOGGER.ERROR_LOG) return False - file_obj = tempfile.NamedTemporaryFile( - dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") + file_obj = tempfile.NamedTemporaryFile(dir=OPTIONS_MANAGER.update_package, prefix="build_tools-") files_to_sign = [] zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED) # file name will be prefixed by build_tools in hash signed data @@ -374,6 +363,37 @@ def create_build_tools_zip(): return file_obj +def do_sign_package(update_package, update_file_name): + signed_package = os.path.join( + update_package, "%s.zip" % update_file_name) + OPTIONS_MANAGER.signed_package = signed_package + if os.path.exists(signed_package): + os.remove(signed_package) + + sign_ota_package = \ + OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) + if sign_ota_package: + return sign_ota_package() + else: + return sign_package() + + +def get_update_file_name(): + if OPTIONS_MANAGER.sd_card : + package_type = "sd" + elif OPTIONS_MANAGER.source_package : + package_type = "diff" + else : + package_type = "full" + if OPTIONS_MANAGER.not_l2: + update_file_name = ''.join( + ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) + else : + update_file_name = ''.join( + ["updater_", package_type]) + return update_file_name + + def do_zip_update_package(): zip_file = zipfile.ZipFile(OPTIONS_MANAGER.update_package_file_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) @@ -398,6 +418,7 @@ def do_zip_update_package(): zip_file.write(patch_obj.name, "%s.patch.dat" % partition) zip_file.close() + return True def create_hsd_for_build_tools(zip_file, files_to_sign): @@ -438,18 +459,7 @@ def build_update_package(no_zip, update_package, prelude_script, else: return False - if OPTIONS_MANAGER.sd_card : - package_type = "sd" - elif OPTIONS_MANAGER.source_package : - package_type = "diff" - else : - package_type = "full" - if OPTIONS_MANAGER.not_l2: - update_file_name = ''.join( - ["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")]) - else : - update_file_name = ''.join( - ["updater_", package_type]) + update_file_name = get_update_file_name() if not no_zip: update_package_path = os.path.join( @@ -467,20 +477,11 @@ def build_update_package(no_zip, update_package, prelude_script, return False OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj - do_zip_update_package() + if not do_zip_update_package(): + UPDATE_LOGGER.print_log("Zip update package fail", UPDATE_LOGGER.ERROR_LOG) + return False - signed_package = os.path.join( - update_package, "%s.zip" % update_file_name) - OPTIONS_MANAGER.signed_package = signed_package - if os.path.exists(signed_package): - os.remove(signed_package) - - sign_ota_package = \ - OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT) - if sign_ota_package: - sign_result = sign_ota_package() - else: - sign_result = sign_package() + sign_result = do_sign_package(update_package, update_file_name) if not sign_result: UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG) diff --git a/utils.py b/utils.py index 8f28c09..d6d1401 100644 --- a/utils.py +++ b/utils.py @@ -21,14 +21,15 @@ import json import os import shutil import tempfile -import zipfile from collections import OrderedDict +import xmltodict +import zipfile + +from build_pkcs7 import sign_ota_package from copy import copy from ctypes import cdll from cryptography.hazmat.primitives import hashes from log_exception import UPDATE_LOGGER -import xmltodict -from build_pkcs7 import sign_ota_package operation_path = os.path.dirname(os.path.realpath(__file__)) PRODUCT = 'hi3516' @@ -284,8 +285,7 @@ def parse_update_config(xml_path): with open(xml_path, 'r') as xml_file: xml_str = xml_file.read() else: - UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" % - xml_path, UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" % xml_path, UPDATE_LOGGER.ERROR_LOG) ret_params = [False, False, False, False, False, False, False] return ret_params xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8') @@ -299,44 +299,38 @@ def parse_update_config(xml_path): head_list.pop() whole_list = [] difference_list = [] - component_dict = {} + comp_dict = {} full_image_path_list = [] if not OPTIONS_MANAGER.not_l2: - expand_component(component_dict) + expand_component(comp_dict) if isinstance(component_info, OrderedDict) or isinstance(component_info, dict): component_info = [component_info] if component_info is None: ret_params = [[], {}, [], [], '', [], False] return ret_params for component in component_info: + if component['@compAddr'] == 'userdata' and not OPTIONS_MANAGER.sd_card: + continue component_list = list(component.values()) component_list.pop() - if component['@compAddr'] in (whole_list + difference_list): - UPDATE_LOGGER.print_log("This component %s repeats!" % - component['@compAddr'], - UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("This component %s repeats!" % component['@compAddr'], UPDATE_LOGGER.ERROR_LOG) ret_params = [False, False, False, False, False, False, False] return ret_params if component['@compType'] == '0': whole_list.append(component['@compAddr']) - OPTIONS_MANAGER.full_img_name_list.\ - append(split_img_name(component['#text'])) - tem_path = os.path.join(OPTIONS_MANAGER.target_package_dir, - component.get("#text", None)) + OPTIONS_MANAGER.full_img_name_list.append(split_img_name(component['#text'])) + tem_path = os.path.join(OPTIONS_MANAGER.target_package_dir, component.get("#text", None)) full_image_path_list.append(tem_path) - component_dict[component['@compAddr']] = component_list + comp_dict[component['@compAddr']] = component_list elif component['@compType'] == '1': difference_list.append(component['@compAddr']) - OPTIONS_MANAGER.incremental_img_name_list.\ - append(split_img_name(component['#text'])) + OPTIONS_MANAGER.incremental_img_name_list.append(split_img_name(component['#text'])) UPDATE_LOGGER.print_log('XML file parsing completed!') - ret_params = [head_list, component_dict, - whole_list, difference_list, package_version, - full_image_path_list] + ret_params = [head_list, comp_dict, whole_list, difference_list, package_version, full_image_path_list] return ret_params diff --git a/vendor_script.py b/vendor_script.py index d646a1c..edb6a23 100644 --- a/vendor_script.py +++ b/vendor_script.py @@ -80,7 +80,6 @@ class ExtensionCmdRegister: """ self.__cmd_in_so_dict = {} - def generate_register_cmd_script(self): """ Generate the register script. From 0a3d20c7a303eb987ed92f7bbe775d1675f4d479 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 18 Aug 2023 10:52:29 +0800 Subject: [PATCH 2/2] codex Signed-off-by: unknown --- build_update.py | 2 +- create_hashdata.py | 46 ++++++++++++++++++++----------------- script_generator.py | 2 +- unpack_updater_package.py | 2 +- update_package.py | 4 ++-- utils.py | 48 +++++++++++++++++++-------------------- 6 files changed, 54 insertions(+), 50 deletions(-) diff --git a/build_update.py b/build_update.py index 3271042..2ac3ea4 100644 --- a/build_update.py +++ b/build_update.py @@ -796,7 +796,7 @@ def main(): partition_file, signing_algorithm, hash_algorithm, private_key = get_args() if not_l2: no_zip = True - + # Unpack updater package unpack_package_processing() diff --git a/create_hashdata.py b/create_hashdata.py index 77cfafc..237122c 100644 --- a/create_hashdata.py +++ b/create_hashdata.py @@ -58,6 +58,7 @@ HASH_INFO_FMT = "<3HI" HASH_DATA_HEADER_FMT = "<32sHI" HASH_DATA_ADDR_FMT = "<2I" + class CreateHash(object): """ Create the component hash data @@ -128,7 +129,8 @@ class CreateHash(object): component_len = os.path.getsize(component.file_path) block_num = component_len // HASH_BLOCK_SIZE component_name = component.component_addr.decode().ljust(COMPONENT_NAME_SIZE, "\0") - UPDATE_LOGGER.print_log("calc component hash component name:%s %d" % (component_name,len(component_name))) + UPDATE_LOGGER.print_log( + "calc component hash component name:%s %d" % (component_name, len(component_name))) total_block = block_num + 1 if component_len % HASH_BLOCK_SIZE > 0 else block_num self.hashdata += struct.pack(HASH_DATA_HEADER_FMT, component_name.encode(), total_block, component_len) @@ -193,25 +195,27 @@ class CreateHash(object): return True def parse_print_hashdata(self, save_path): - hash_check_file_p = open(os.path.join(save_path + "hash_check_file_parse"), "wb+") - hash_check_file_p.write(("hash info:").encode()) - hash_check_file_p.write((HashType(self.hash_type.value).name + ' ' + str(self.hash_digest_size) + \ - ' ' + str(self.component_num) + ' ' + str(self.block_size) + '\n').encode()) + hash_check_fd = open(os.path.join(save_path + "hash_check_file_parse"), os.O_RDWR | os.O_CREAT, 0o755) + with os.fdopen(hash_check_fd, "wb+") as hash_check_file_p: + hash_check_file_p.write(("hash info:").encode()) + hash_check_file_p.write(("%s %s %s %s\n" % ( + HashType(self.hash_type.value).name, str(self.hash_digest_size), + str(self.component_num), str(self.block_size))).encode()) - offset = 0 - for i in range(0, self.component_num): - hash_check_file_p.write((self.hashdata_list[offset][0] + '\n').encode()) - hash_check_file_p.write((str(self.hashdata_list[offset][1]) + ' ' + \ - str(self.hashdata_list[offset][2]) + '\n').encode()) - for j in range(0, self.hashdata_list[offset][1]): - index = offset + 1 - hashdata_hexstr = "".join("%02x" % b for b in self.hashdata_list[j + index][2]) - hash_check_file_p.write((str(self.hashdata_list[j + index][0]) + ' ' + \ - str(self.hashdata_list[j + index][1]) + ' ' + hashdata_hexstr + \ - '\n').encode()) - offset += (1 + self.hashdata_list[offset][1]) + offset = 0 + for i in range(0, self.component_num): + hash_check_file_p.write(("%s\n" % (self.hashdata_list[offset][0])).encode()) + hash_check_file_p.write(("%s %s\n" % ( + str(self.hashdata_list[offset][1]), str(self.hashdata_list[offset][2]))).encode()) + for j in range(0, self.hashdata_list[offset][1]): + index = offset + 1 + hashdata_hexstr = "".join("%02x" % b for b in self.hashdata_list[j + index][2]) + hash_check_file_p.write(("%s" % ( + str(self.hashdata_list[j + index][0]), str(self.hashdata_list[j + index][1]), + hashdata_hexstr)).encode()) - signdata_hexstr = "".join("%02x" % b for b in self.signdata) - hash_check_file_p.write(("hash sign:").encode()) - hash_check_file_p.write(signdata_hexstr.encode()) - hash_check_file_p.close() \ No newline at end of file + offset += (1 + self.hashdata_list[offset][1]) + + signdata_hexstr = "".join("%02x" % b for b in self.signdata) + hash_check_file_p.write(("hash sign:").encode()) + hash_check_file_p.write(signdata_hexstr.encode()) diff --git a/script_generator.py b/script_generator.py index cd9493c..63fb707 100644 --- a/script_generator.py +++ b/script_generator.py @@ -231,7 +231,7 @@ class VerseScript(Script): """ cmd = 'update_partitions("/%s");\n' % PARTITION_FILE return cmd - + def full_image_update(self, update_file_name): cmd = 'update_from_bin("%s");\n' % update_file_name return cmd diff --git a/unpack_updater_package.py b/unpack_updater_package.py index 3b59156..aa6e220 100644 --- a/unpack_updater_package.py +++ b/unpack_updater_package.py @@ -110,7 +110,7 @@ class UnpackPackage(object): package_file.seek(self.addr_offset) component_addr = package_file.read(self.addr_size) component_addr = component_addr.split(b"\x00")[0].decode('utf-8') - + package_file.seek(self.type_offset) component_type_buffer = package_file.read(COMPONENT_TYPE_SIZE) component_type = struct.unpack(COMPONENT_TYPE_FMT, component_type_buffer) diff --git a/update_package.py b/update_package.py index 4476848..47b78f7 100644 --- a/update_package.py +++ b/update_package.py @@ -131,7 +131,7 @@ def create_update_bin(): sign_algo = SIGN_ALGO_PSS else: sign_algo = SIGN_ALGO_RSA - + # create bin package = CreatePackage(head_list, component_list, save_patch, OPTIONS_MANAGER.private_key) if not package.create_package(): @@ -460,7 +460,7 @@ def build_update_package(no_zip, update_package, prelude_script, return False update_file_name = get_update_file_name() - + if not no_zip: update_package_path = os.path.join( update_package, '%s_unsigned.zip' % update_file_name) diff --git a/utils.py b/utils.py index d6d1401..8b36ca0 100644 --- a/utils.py +++ b/utils.py @@ -25,11 +25,11 @@ from collections import OrderedDict import xmltodict import zipfile +from cryptography.hazmat.primitives import hashes +from log_exception import UPDATE_LOGGER from build_pkcs7 import sign_ota_package from copy import copy from ctypes import cdll -from cryptography.hazmat.primitives import hashes -from log_exception import UPDATE_LOGGER operation_path = os.path.dirname(os.path.realpath(__file__)) PRODUCT = 'hi3516' @@ -118,19 +118,8 @@ class ExtInit: return False -@singleton -class OptionsManager: - """ - Options management class - """ - +class BaseOptionsManager: def __init__(self): - self.init = ExtInit() - self.parser = argparse.ArgumentParser() - - # Own parameters - self.product = None - # Entry parameters self.source_package = None self.target_package = None @@ -148,6 +137,23 @@ class OptionsManager: self.make_dir_path = None + +@singleton +class OptionsManager(BaseOptionsManager): + """ + Options management class + """ + + def __init__(self): + super().__init__() + + self.init = ExtInit() + self.parser = argparse.ArgumentParser() + + # Own parameters + self.product = None + + # Parsed package parameters self.target_package_dir = None self.target_package_config_dir = None @@ -329,7 +335,6 @@ def parse_update_config(xml_path): difference_list.append(component['@compAddr']) OPTIONS_MANAGER.incremental_img_name_list.append(split_img_name(component['#text'])) - UPDATE_LOGGER.print_log('XML file parsing completed!') ret_params = [head_list, comp_dict, whole_list, difference_list, package_version, full_image_path_list] return ret_params @@ -584,9 +589,7 @@ def get_update_info(): os.path.join(OPTIONS_MANAGER.target_package_config_dir, VERSION_MBN_PATH))) if version_mbn_content is False: - UPDATE_LOGGER.print_log( - "Get version mbn content failed!", - log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Get version mbn content failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False OPTIONS_MANAGER.version_mbn_content = version_mbn_content OPTIONS_MANAGER.board_list_file_path = os.path.join( @@ -597,9 +600,7 @@ def get_update_info(): os.path.join(OPTIONS_MANAGER.target_package_config_dir, BOARD_LIST_PATH))) if board_list_content is False: - UPDATE_LOGGER.print_log( - "Get board list content failed!", - log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Get board list content failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False OPTIONS_MANAGER.board_list_content = board_list_content @@ -615,11 +616,10 @@ def get_update_info(): OPTIONS_MANAGER.target_package_version, \ OPTIONS_MANAGER.full_image_path_list = \ parse_update_config(xml_file_path) + UPDATE_LOGGER.print_log("XML file parsing completed!") if head_info_list is False or component_info_dict is False or \ full_img_list is False or incremental_img_list is False: - UPDATE_LOGGER.print_log( - "Get parse update config xml failed!", - log_type=UPDATE_LOGGER.ERROR_LOG) + UPDATE_LOGGER.print_log("Get parse update config xml failed!", log_type=UPDATE_LOGGER.ERROR_LOG) return False OPTIONS_MANAGER.head_info_list, OPTIONS_MANAGER.component_info_dict, \ OPTIONS_MANAGER.full_img_list, OPTIONS_MANAGER.incremental_img_list = \