Signed-off-by: chenzihan <chenzihan13@huawei.com>

fix:update
This commit is contained in:
chenzihan 2022-09-13 15:08:57 +08:00
parent 0ce446ef79
commit fd0b0056a0
15 changed files with 1259 additions and 450 deletions

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.

218
build_pkcs7.py Normal file
View File

@ -0,0 +1,218 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import hashlib
import struct
from log_exception import UPDATE_LOGGER
from asn1crypto import cms
from asn1crypto import pem
from asn1crypto import util
from asn1crypto import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
operation_path = os.path.dirname(os.path.realpath(__file__))
CERT_PATH = os.path.join(operation_path, 'sign_cert/signing_cert.crt')
BLCOK_SIZE = 8192
FOOTER_LENGTH = 6
ZIP_ECOD_LENGTH = 22
DIGEST_SHA256 = 672;
SHA256_HASH_LEN = 32
CONTENT_INFO_FORMAT = "<2H32s"
# the length of zip eocd comment
ZIP_EOCD_COMMENT_LEN_FORMAT = "<H"
# signed package footer
SIGANTURE_FOOTER_FORMAT = "<3H"
def load_public_cert():
with open(CERT_PATH, 'rb') as f:
der_bytes = f.read()
if pem.detect(der_bytes):
type_name, headers, der_bytes = pem.unarmor(der_bytes)
return x509.Certificate.load(der_bytes)
def calculate_package_hash(package_path):
"""
:return: (hash) for path using hashlib.sha256()
"""
h = hashlib.sha256()
length = 0
remain_len = os.path.getsize(package_path) - ZIP_ECOD_LENGTH
with open(package_path, 'rb') as f:
while remain_len > BLCOK_SIZE:
h.update(f.read(BLCOK_SIZE))
remain_len -= BLCOK_SIZE
if remain_len > 0:
h.update(f.read(remain_len))
return h.digest()
def sign_digest_with_pss(digset, private_key_file):
# read private key from pem file
try:
with open(private_key_file, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
pad = padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH)
signature = private_key.sign(
digset,
pad,
hashes.SHA256()
)
except (OSError, ValueError):
return False
return signature
def sign_digest(digset, private_key_file):
# read private key from pem file
try:
with open(private_key_file, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
signature = private_key.sign(
digset,
padding.PKCS1v15(),
hashes.SHA256()
)
except (OSError, ValueError):
return False
return signature
def create_encap_content_info(diget):
if not diget:
UPDATE_LOGGER.print_log("calc package hash failed! file: %s",
log_type=UPDATE_LOGGER.ERROR_LOG)
return
content_header = struct.pack(CONTENT_INFO_FORMAT, DIGEST_SHA256,
SHA256_HASH_LEN, diget)
return content_header
def write_signed_package(unsigned_package, signature, signed_package):
"""
:Write signature to signed package
"""
signature_size = len(signature)
signature_total_size = signature_size + FOOTER_LENGTH
fd = os.open(signed_package, os.O_RDWR | os.O_CREAT, 0o777)
f_signed = os.fdopen(fd, 'wb')
remain_len = os.path.getsize(unsigned_package) - 2
with open(unsigned_package, 'rb') as f_unsign:
while remain_len > BLCOK_SIZE:
f_signed.write(f_unsign.read(BLCOK_SIZE))
remain_len -= BLCOK_SIZE
if remain_len > 0:
f_signed.write(f_unsign.read(remain_len))
zip_comment_len = struct.pack(ZIP_EOCD_COMMENT_LEN_FORMAT,
signature_total_size)
f_signed.write(zip_comment_len)
f_signed.write(signature)
footter = struct.pack(SIGANTURE_FOOTER_FORMAT, signature_total_size,
0xffff, signature_total_size)
f_signed.write(footter)
f_signed.close()
def sign_ota_package(package_path, signed_package, private_key):
digest = calculate_package_hash(package_path)
data = create_encap_content_info(digest)
signature = sign_digest(digest, private_key)
digest_file = open("digest", 'wb')
digest_file.write(digest)
digest_file.close()
signatute_file = open("signature", 'wb')
signatute_file.write(signature)
signatute_file.close()
# Creating a SignedData object from cms
signed_data = cms.SignedData()
signed_data['version'] = 'v1'
signed_data['encap_content_info'] = util.OrderedDict([
('content_type', 'data'),
('content', data)])
signed_data['digest_algorithms'] = [util.OrderedDict([
('algorithm', 'sha256'),
('parameters', None)])]
cert = load_public_cert()
# Adding this certificate to SignedData object
signed_data['certificates'] = [cert]
# Setting signer info section
signer_info = cms.SignerInfo()
signer_info['version'] = 'v1'
signer_info['digest_algorithm'] = util.OrderedDict([
('algorithm', 'sha256'),
('parameters', None)])
signer_info['signature_algorithm'] = util.OrderedDict([
('algorithm', 'sha256_rsa'),
('parameters', None)])
issuer = cert.issuer
serial_number = cert.serial_number
issuer_and_serial = cms.IssuerAndSerialNumber()
issuer_and_serial['issuer'] = cert.issuer
issuer_and_serial['serial_number'] = cert.serial_number
key_id = cert.key_identifier_value.native
signer_info['sid'] = cms.SignerIdentifier({
'issuer_and_serial_number': issuer_and_serial})
signer_info['signature'] = signature
# Adding SignerInfo object to SignedData object
signed_data['signer_infos'] = [signer_info]
# Writing everything into ASN.1 object
asn1obj = cms.ContentInfo()
asn1obj['content_type'] = 'signed_data'
asn1obj['content'] = signed_data
# This asn1obj can be dumped to a disk using dump() method (DER format)
write_signed_package(package_path, asn1obj.dump(), signed_package)
return True

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -46,16 +46,16 @@ optional arguments:
"""
import filecmp
import os
import sys
import argparse
import subprocess
import tempfile
import hashlib
import xmltodict
import patch_package_process
from gigraph_process import GigraphProcess
from image_class import FullUpdateImage
from image_class import is_sparse_image
from image_class import IncUpdateImage
from transfers_manager import TransfersManager
from log_exception import UPDATE_LOGGER
@ -64,6 +64,7 @@ from script_generator import VerseScript
from script_generator import RefrainScript
from script_generator import EndingScript
from update_package import build_update_package
from unpack_updater_package import UnpackPackage
from utils import OPTIONS_MANAGER
from utils import UPDATER_CONFIG
from utils import parse_partition_file_xml
@ -74,8 +75,16 @@ from utils import XML_FILE_PATH
from utils import get_update_info
from utils import SCRIPT_KEY_LIST
from utils import PER_BLOCK_SIZE
from utils import E2FSDROID_PATH
from utils import MAXIMUM_RECURSION_DEPTH
from utils import VERSE_SCRIPT_EVENT
from utils import INC_IMAGE_EVENT
from utils import DIFF_EXE_PATH
from utils import get_update_config_softversion
from vendor_script import create_vendor_script_class
sys.setrecursionlimit(MAXIMUM_RECURSION_DEPTH)
def type_check(arg):
"""
@ -132,6 +141,24 @@ def check_update_package(arg):
return False
if make_dir_path is not None:
OPTIONS_MANAGER.make_dir_path = make_dir_path
OPTIONS_MANAGER.update_package = arg
return arg
def unpack_check(arg):
"""
Argument check, which is used to check whether
the update package path exists.
:param arg: The arg to check.
:return: Check result
"""
unpack_package = os.path.join(OPTIONS_MANAGER.update_package, arg)
if not os.path.isfile(unpack_package):
UPDATE_LOGGER.print_log(
"FileNotFoundError, path: %s" % unpack_package, UPDATE_LOGGER.ERROR_LOG)
OPTIONS_MANAGER.unpack_package_path = None
return False
OPTIONS_MANAGER.unpack_package_path = unpack_package
return arg
@ -146,8 +173,10 @@ def create_entrance_args():
signing_algorithm : signature algorithm (ECC and RSA (default))
private_key : path of the private key file
"""
description = "Tool for creating update package."
parser = argparse.ArgumentParser(description=description)
parser = OPTIONS_MANAGER.parser
parser.description = "Tool for creating update package."
parser.add_argument("-unpack", "--unpack_package", type=unpack_check,
default=None, help="Unpack updater package.")
parser.add_argument("-s", "--source_package", type=type_check,
default=None, help="Source package file path.")
parser.add_argument("target_package", type=type_check,
@ -183,36 +212,34 @@ def create_entrance_args():
help="SD Card mode, "
"Create update package for SD Card.")
args = parser.parse_args()
source_package = args.source_package
OPTIONS_MANAGER.source_package = source_package
target_package = args.target_package
OPTIONS_MANAGER.target_package = target_package
update_package = args.update_package
OPTIONS_MANAGER.update_package = update_package
no_zip = args.no_zip
OPTIONS_MANAGER.no_zip = no_zip
partition_file = args.partition_file
OPTIONS_MANAGER.partition_file = partition_file
signing_algorithm = args.signing_algorithm
OPTIONS_MANAGER.signing_algorithm = signing_algorithm
hash_algorithm = args.hash_algorithm
OPTIONS_MANAGER.hash_algorithm = hash_algorithm
private_key = args.private_key
OPTIONS_MANAGER.private_key = private_key
not_l2 = args.not_l2
OPTIONS_MANAGER.not_l2 = not_l2
signing_length = int(args.signing_length)
OPTIONS_MANAGER.signing_length = signing_length
xml_path = args.xml_path
OPTIONS_MANAGER.xml_path = xml_path
sd_card = args.sd_card
OPTIONS_MANAGER.sd_card = sd_card
def parse_args():
args = OPTIONS_MANAGER.parser.parse_args()
OPTIONS_MANAGER.source_package = args.source_package
OPTIONS_MANAGER.target_package = args.target_package
OPTIONS_MANAGER.update_package = args.update_package
OPTIONS_MANAGER.no_zip = args.no_zip
OPTIONS_MANAGER.partition_file = args.partition_file
OPTIONS_MANAGER.signing_algorithm = args.signing_algorithm
OPTIONS_MANAGER.hash_algorithm = args.hash_algorithm
OPTIONS_MANAGER.private_key = args.private_key
OPTIONS_MANAGER.not_l2 = args.not_l2
OPTIONS_MANAGER.signing_length = int(args.signing_length)
OPTIONS_MANAGER.xml_path = args.xml_path
OPTIONS_MANAGER.sd_card = args.sd_card
ret_args = [source_package, target_package, update_package,
no_zip, not_l2, partition_file, signing_algorithm,
hash_algorithm, private_key]
def get_args():
ret_args = \
[OPTIONS_MANAGER.source_package,
OPTIONS_MANAGER.target_package,
OPTIONS_MANAGER.update_package,
OPTIONS_MANAGER.no_zip,
OPTIONS_MANAGER.not_l2,
OPTIONS_MANAGER.partition_file,
OPTIONS_MANAGER.signing_algorithm,
OPTIONS_MANAGER.hash_algorithm,
OPTIONS_MANAGER.private_key]
return ret_args
@ -227,6 +254,11 @@ def get_script_obj():
verse_script = VerseScript()
refrain_script = RefrainScript()
ending_script = EndingScript()
generate_verse_script = \
OPTIONS_MANAGER.init.invoke_event(VERSE_SCRIPT_EVENT)
if generate_verse_script:
verse_script = generate_verse_script()
else:
UPDATE_LOGGER.print_log(
"Get vendor extension object completed!"
@ -238,6 +270,30 @@ def get_script_obj():
return prelude_script, verse_script, refrain_script, ending_script
def get_source_package_path(source_package):
"""
get_source_package_path.
:param source_package: source package path
:return:
"""
if os.path.isdir(source_package):
OPTIONS_MANAGER.source_package_dir = source_package
elif source_package.endswith('.zip'):
# Decompress the source package.
tmp_dir_obj, unzip_dir = unzip_package(source_package)
if tmp_dir_obj is False or unzip_dir is False:
clear_resource(err_clear=True)
return False
OPTIONS_MANAGER.source_package_dir = unzip_dir
OPTIONS_MANAGER.source_package_temp_obj = tmp_dir_obj
else:
UPDATE_LOGGER.print_log("Input Update Package type exception!"
"path: %s" % source_package, UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
return True
def check_incremental_args(no_zip, partition_file, source_package,
incremental_img_list):
"""
@ -276,9 +332,8 @@ def check_incremental_args(no_zip, partition_file, source_package,
clear_resource(err_clear=True)
return False
OPTIONS_MANAGER.source_package_temp_obj, \
OPTIONS_MANAGER.source_package_dir = \
unzip_package(source_package, origin='source')
if not get_source_package_path(source_package):
return False
xml_path = ''
if OPTIONS_MANAGER.source_package_dir is not False:
xml_path = os.path.join(OPTIONS_MANAGER.source_package_dir,
@ -295,6 +350,7 @@ def check_incremental_args(no_zip, partition_file, source_package,
return False
xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
package_dict = xml_content_dict.get('package', {})
get_update_config_softversion(OPTIONS_MANAGER.source_package_dir, package_dict.get('head', {}))
head_dict = package_dict.get('head', {}).get('info')
OPTIONS_MANAGER.source_package_version = head_dict.get("@softVersion")
if check_package_version(OPTIONS_MANAGER.target_package_version,
@ -400,8 +456,8 @@ def check_package_version(target_ver, source_ver):
return:
"""
try:
target_num = ''.join(target_ver.split(' ')[-1].split('.')[1:3])
source_num = ''.join(source_ver.split(' ')[-1].split('.')[1:3])
target_num = ''.join(target_ver.split(' ')[-1].replace('.', ''))
source_num = ''.join(source_ver.split(' ')[-1].replace('.', ''))
if int(target_num) <= int(source_num):
UPDATE_LOGGER.print_log(
'Target package version %s <= Source package version!'
@ -416,6 +472,115 @@ def check_package_version(target_ver, source_ver):
return True
def generate_image_map_file(image_path, map_path, image_name):
"""
:param image_path: image path
:param map_path: image map file path
:param image_name: image name
:return:
"""
if not os.path.exists(image_path):
UPDATE_LOGGER.print_log("The source %s.img file is missing from the"
"source package, cannot be incrementally processed. ",
image_name, UPDATE_LOGGER.ERROR_LOG)
return False
cmd = \
[E2FSDROID_PATH, "-B", map_path, "-a", "/%s" % image_name, image_path, "-e"]
sub_p = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
sub_p.wait()
if not os.path.exists(map_path):
UPDATE_LOGGER.print_log("%s generate image map file failed."
% image_path)
return False
return True
def getFileSHA256(update_package):
sha256obj = hashlib.sha256()
maxbuf = 8192
f = open(update_package, 'rb')
while True:
buf = f.read(maxbuf)
if not buf:
break
sha256obj.update(buf)
f.close()
hash = sha256obj.hexdigest()
return str(hash).upper()
def write_image_patch_script(partition, src_image_path, tgt_image_path,
script_check_cmd_list, script_write_cmd_list, verse_script):
"""
Add command content to the script.
:param partition: image name
:param script_check_cmd_list: incremental check command list
:param script_write_cmd_list: incremental write command list
:param verse_script: verse script object
:return:
"""
src_sha = getFileSHA256(src_image_path)
src_size = os.path.getsize(src_image_path)
tgt_sha = getFileSHA256(tgt_image_path)
tgt_size = os.path.getsize(tgt_image_path)
sha_check_cmd = verse_script.image_sha_check(partition,
src_size, src_sha, tgt_size, tgt_sha)
first_block_check_cmd = verse_script.first_block_check(partition)
abort_cmd = verse_script.abort(partition)
cmd = 'if ({sha_check_cmd} != 0)' \
'{{\n {abort_cmd}}}\n'.format(
sha_check_cmd=sha_check_cmd,
abort_cmd=abort_cmd)
script_check_cmd_list.append(cmd)
image_patch_cmd = verse_script.image_patch(partition, os.path.getsize(src_image_path),
getFileSHA256(src_image_path), os.path.getsize(tgt_image_path),
getFileSHA256(tgt_image_path))
cmd = '%s_WRITE_FLAG%s' % (partition, image_patch_cmd)
script_write_cmd_list.append(cmd)
return True
def increment_image_diff_processing(
partition, src_image_path, tgt_image_path,
script_check_cmd_list, script_write_cmd_list, verse_script):
"""
Incremental image processing
:param verse_script: verse script
:param incremental_img_list: incremental image list
:param source_package_dir: source package path
:param target_package_dir: target package path
:return:
"""
patch_file_obj = tempfile.NamedTemporaryFile(
prefix="%s_patch.dat-" % partition, mode='wb')
OPTIONS_MANAGER.incremental_image_file_obj_list.append(
patch_file_obj)
cmd = [DIFF_EXE_PATH]
cmd.extend(['-s', src_image_path, '-d', tgt_image_path,
'-p', patch_file_obj.name, '-l', '4096'])
sub_p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = sub_p.communicate()
sub_p.wait()
if sub_p.returncode != 0:
raise ValueError(output)
return write_image_patch_script(partition, src_image_path, tgt_image_path,
script_check_cmd_list, script_write_cmd_list, verse_script)
def increment_image_processing(
verse_script, incremental_img_list, source_package_dir,
target_package_dir):
@ -430,7 +595,9 @@ def increment_image_processing(
script_check_cmd_list = []
script_write_cmd_list = []
patch_process = None
for each_img in incremental_img_list:
block_diff = 0
for each_img_name in OPTIONS_MANAGER.incremental_img_name_list:
each_img = each_img_name[:-4]
each_src_image_path = \
os.path.join(source_package_dir,
'%s.img' % each_img)
@ -443,66 +610,51 @@ def increment_image_processing(
each_tgt_map_path = \
os.path.join(target_package_dir,
'%s.map' % each_img)
if not os.path.exists(each_src_image_path):
UPDATE_LOGGER.print_log(
"The source %s.img file is missing from the source package, "
"the component: %s cannot be incrementally processed. "
"path: %s!" %
(each_img, each_img,
os.path.join(source_package_dir, UPDATER_CONFIG,
XML_FILE_PATH)),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
src_is_sparse = is_sparse_image(each_src_image_path)
tgt_is_sparse = is_sparse_image(each_tgt_image_path)
check_make_map_path(each_img)
cmd = ["e2fsdroid", "-B", each_src_map_path,
"-a", "/%s" % each_img, each_src_image_path]
if not src_is_sparse:
cmd.append("-e")
sub_p = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
sub_p.wait()
if not os.path.exists(each_tgt_image_path):
UPDATE_LOGGER.print_log(
"The target %s.img file is missing from the target package, "
"the component: %s cannot be incrementally processed. "
"Please check xml config, path: %s!" %
(each_img, each_img,
os.path.join(target_package_dir, UPDATER_CONFIG,
XML_FILE_PATH)),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
cmd = ["e2fsdroid", "-B", each_tgt_map_path,
"-a", "/%s" % each_img, each_tgt_image_path]
if not tgt_is_sparse:
cmd.append("-e")
sub_p = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
sub_p.wait()
if filecmp.cmp(each_src_image_path, each_tgt_image_path):
UPDATE_LOGGER.print_log(
"Source Image is the same as Target Image!"
"src image path: %s, tgt image path: %s" %
(each_src_image_path, each_tgt_image_path),
UPDATE_LOGGER.ERROR_LOG)
UPDATE_LOGGER.INFO_LOG)
src_generate_map = True
tgt_generate_map = True
if not os.path.exists(each_src_map_path):
src_generate_map = generate_image_map_file(each_src_image_path,
each_src_map_path, each_img)
if not src_generate_map:
UPDATE_LOGGER.print_log("The source %s.img file"
"generate map file failed. " % each_img)
if not os.path.exists(each_tgt_map_path):
tgt_generate_map = generate_image_map_file(each_tgt_image_path,
each_tgt_map_path, each_img)
if not tgt_generate_map:
UPDATE_LOGGER.print_log("The target %s.img file"
"generate map file failed. " % each_img)
if not src_generate_map or not tgt_generate_map:
if increment_image_diff_processing(each_img, each_src_image_path, each_tgt_image_path,
script_check_cmd_list, script_write_cmd_list, verse_script) is True:
continue
UPDATE_LOGGER.print_log("increment_image_diff_processing %s failed" % each_img)
clear_resource(err_clear=True)
return False
if not src_is_sparse and not tgt_is_sparse:
src_image_class = \
IncUpdateImage(each_src_image_path, each_src_map_path)
tgt_image_class = \
IncUpdateImage(each_tgt_image_path, each_tgt_map_path)
else:
raise RuntimeError
block_diff += 1
src_image_class = \
IncUpdateImage(each_src_image_path, each_src_map_path)
tgt_image_class = \
IncUpdateImage(each_tgt_image_path, each_tgt_map_path)
OPTIONS_MANAGER.src_image = src_image_class
OPTIONS_MANAGER.tgt_image = tgt_image_class
IncImage = OPTIONS_MANAGER.init.invoke_event(INC_IMAGE_EVENT)
if IncImage:
src_image_class, tgt_image_class = IncImage()
transfers_manager = TransfersManager(
each_img, tgt_image_class, src_image_class)
@ -519,11 +671,12 @@ def increment_image_processing(
patch_process.package_patch_zip.package_patch_zip()
patch_process.write_script(each_img, script_check_cmd_list,
script_write_cmd_list, verse_script)
if not check_patch_file(patch_process):
UPDATE_LOGGER.print_log(
'Verify the incremental result failed!',
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
if block_diff > 0:
if not check_patch_file(patch_process):
UPDATE_LOGGER.print_log(
'Verify the incremental result failed!',
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
UPDATE_LOGGER.print_log(
'Verify the incremental result successfully!',
UPDATE_LOGGER.INFO_LOG)
@ -571,7 +724,7 @@ def check_make_map_path(each_img):
in the environment variable, and False will be returned.
"""
try:
cmd = ["e2fsdroid", " -h"]
cmd = [E2FSDROID_PATH, " -h"]
subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except FileNotFoundError:
@ -636,17 +789,35 @@ def check_args(private_key, source_package, target_package, update_package):
return True
create_entrance_args()
def main():
"""
Entry function.
"""
parse_args()
OPTIONS_MANAGER.product = PRODUCT
source_package, target_package, update_package, no_zip, not_l2, \
partition_file, signing_algorithm, hash_algorithm, private_key = \
create_entrance_args()
get_args()
if not_l2:
no_zip = True
# Unpack updater package
if OPTIONS_MANAGER.unpack_package_path:
package = UnpackPackage()
if not package.unpack_package():
UPDATE_LOGGER.print_log(
"Unpack update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return
UPDATE_LOGGER.print_log("Unpack update package .bin success!")
clear_resource(err_clear=True)
return
if OPTIONS_MANAGER.sd_card:
if source_package is not None or \
OPTIONS_MANAGER.xml_path is not None or \
@ -711,12 +882,14 @@ def main():
# Full processing
if len(OPTIONS_MANAGER.full_img_list) != 0:
verse_script.add_command("\n# ---- full image ----\n")
full_image_content_len_list, full_image_file_obj_list = \
full_update_image = \
FullUpdateImage(OPTIONS_MANAGER.target_package_dir,
OPTIONS_MANAGER.full_img_list, verse_script,
OPTIONS_MANAGER.full_image_path_list,
no_zip=OPTIONS_MANAGER.no_zip).\
update_full_image()
OPTIONS_MANAGER.full_img_list,
OPTIONS_MANAGER.full_img_name_list,
verse_script, OPTIONS_MANAGER.full_image_path_list,
no_zip=OPTIONS_MANAGER.no_zip)
full_image_content_len_list, full_image_file_obj_list = \
full_update_image.update_full_image()
if full_image_content_len_list is False or \
full_image_file_obj_list is False:
clear_resource(err_clear=True)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -33,6 +33,6 @@ if __name__ == '__main__':
gen_script_name = parse_params[0]
output_path = parse_params[1]
PARSE_SCRIPTS = subprocess.check_call(
parse_scripts = subprocess.check_call(
[gen_script_name], stdout=subprocess.PIPE, cwd=output_path)
print("result:", PARSE_SCRIPTS)
print("result:", parse_scripts)

312
create_update_package.py Normal file
View File

@ -0,0 +1,312 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description : Generate the update.bin file
"""
import os
import struct
import hashlib
import subprocess
from log_exception import UPDATE_LOGGER
from utils import OPTIONS_MANAGER
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
UPGRADE_FILE_HEADER_LEN = 180
UPGRADE_RESERVE_LEN = 16
SIGN_SHA256_LEN = 256
SIGN_SHA384_LEN = 384
UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN
TLV_SIZE = 4
UPGRADE_PKG_HEADER_SIZE = 136
UPGRADE_PKG_TIME_SIZE = 32
UPGRADE_COMPINFO_SIZE = 71
UPGRADE_COMPINFO_SIZE_L2 = 87
COMPONEBT_ADDR_SIZE = 16
COMPONEBT_ADDR_SIZE_L2 = 32
COMPONENT_INFO_FMT_SIZE = 5
COMPONENT_VERSION_SIZE = 10
COMPONENT_SIZE_FMT_SIZE = 8
COMPONENT_DIGEST_SIZE = 32
BLCOK_SIZE = 8192
HEADER_TLV_TYPE = 0x11
HEADER_TLV_TYPE_L2 = 0x01
# signature algorithm
SIGN_ALGO_RSA = "SHA256withRSA"
SIGN_ALGO_PSS = "SHA256withPSS"
"""
Format
H: unsigned short
I: unsigned int
B: unsigned char
s: char[]
"""
TLV_FMT = "2H"
UPGRADE_PKG_HEADER_FMT = "2I64s64s"
UPGRADE_PKG_TIME_FMT = "16s16s"
COMPONENT_INFO_FMT = "H3B"
COMPONENT_SIZE_FMT = "iI"
class CreatePackage(object):
"""
Create the update.bin file
"""
def __init__(self, head_list, component_list, save_path, key_path):
self.head_list = head_list
self.component_list = component_list
self.save_path = save_path
self.key_path = key_path
self.compinfo_offset = 0
self.component_offset = 0
self.sign_offset = 0
if OPTIONS_MANAGER.not_l2:
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE
self.header_tlv_type = HEADER_TLV_TYPE
else:
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2
self.header_tlv_type = HEADER_TLV_TYPE_L2
def verify_param(self):
if self.head_list is None or self.component_list is None or \
self.save_path is None or self.key_path is None:
UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG)
return False
if os.path.isdir(self.key_path):
UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG)
return False
if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0:
UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG)
return False
return True
def write_pkginfo(self, package_file):
try:
# Type is 1 for package header in TLV format
header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE)
pkgInfoLength = \
UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \
UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \
self.upgrade_compinfo_size * self.head_list.entry_count
upgrade_pkg_header = struct.pack(
UPGRADE_PKG_HEADER_FMT, pkgInfoLength, self.head_list.update_file_version,
self.head_list.product_update_id, self.head_list.software_version)
# Type is 2 for time in TLV format
time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE)
upgrade_pkg_time = struct.pack(
UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time)
# Type is 5 for component in TLV format
component_tlv = struct.pack(
TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count)
except struct.error:
UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# write pkginfo
pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv
try:
package_file.write(pkginfo)
except IOError:
UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
UPDATE_LOGGER.print_log("Write package header complete")
return True
def write_component_info(self, component, package_file):
UPDATE_LOGGER.print_log("component information StartOffset:%s"\
% self.compinfo_offset)
if OPTIONS_MANAGER.not_l2:
component_addr_size = COMPONEBT_ADDR_SIZE
else:
component_addr_size = COMPONEBT_ADDR_SIZE_L2
try:
package_file.seek(self.compinfo_offset)
package_file.write(component.component_addr)
self.compinfo_offset += component_addr_size
package_file.seek(self.compinfo_offset)
component_info = struct.pack(
COMPONENT_INFO_FMT, component.id, component.res_type,
component.flags, component.type)
package_file.write(component_info)
self.compinfo_offset += COMPONENT_INFO_FMT_SIZE
package_file.seek(self.compinfo_offset)
package_file.write(component.version)
self.compinfo_offset += COMPONENT_VERSION_SIZE
package_file.seek(self.compinfo_offset)
component_size = struct.pack(
COMPONENT_SIZE_FMT, component.size, component.original_size)
package_file.write(component_size)
self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE
package_file.seek(self.compinfo_offset)
package_file.write(component.digest)
self.compinfo_offset += COMPONENT_DIGEST_SIZE
except (struct.error, IOError):
return False
return True
def write_component(self, component, package_file):
UPDATE_LOGGER.print_log("Add component to package StartOffset:%s"\
% self.component_offset)
try:
with open(component.file_path, "rb") as component_file:
component_data = component_file.read()
package_file.seek(self.component_offset)
package_file.write(component_data)
component_len = len(component_data)
self.component_offset += component_len
except:
return False
UPDATE_LOGGER.print_log("Write component complete ComponentSize:%s"\
% component_len)
return True
def calculate_hash(self, package_file):
h = hashlib.sha256()
remain_len = self.component_offset
package_file.seek(0)
while remain_len > BLCOK_SIZE:
h.update(package_file.read(BLCOK_SIZE))
remain_len -= BLCOK_SIZE
if remain_len > 0:
h.update(package_file.read(remain_len))
return h.digest()
def sign_digest_with_pss(self, digset):
try:
with open(self.key_path, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
pad = padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH)
signature = private_key.sign(digset, pad, hashes.SHA256())
except (OSError, ValueError):
return False
return signature
def sign_digest(self, digset):
try:
with open(self.key_path, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
signature = private_key.sign(digset,padding.PKCS1v15(), hashes.SHA256())
except (OSError, ValueError):
return False
return signature
def sign(self, sign_algo):
with open(self.save_path, "rb+") as package_file:
# calculate hash for .bin package
digest = self.calculate_hash(package_file)
if digest == False:
UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# sign .bin package
if sign_algo == SIGN_ALGO_RSA:
signature = self.sign_digest(digest)
elif sign_algo == SIGN_ALGO_PSS:
signature = self.sign_digest_with_pss(digest)
else:
UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
if signature == False:
UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
if len(signature) == SIGN_SHA384_LEN:
self.sign_offset += SIGN_SHA256_LEN
# write signed .bin package
package_file.seek(self.sign_offset)
package_file.write(signature)
UPDATE_LOGGER.print_log(
".bin package signing success! SignOffset: %s" % self.sign_offset)
return True
def create_package(self):
"""
Create the update.bin file
return: update package creation result
"""
if not self.verify_param():
UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG)
return False
with open(self.save_path, "wb+") as package_file:
# Add information to package
if not self.write_pkginfo(package_file):
UPDATE_LOGGER.print_log(
"Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# Add component to package
self.compinfo_offset = UPGRADE_FILE_HEADER_LEN
self.component_offset = UPGRADE_FILE_HEADER_LEN + \
self.head_list.entry_count * self.upgrade_compinfo_size + \
UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN
for i in range(0, self.head_list.entry_count):
UPDATE_LOGGER.print_log("Add component %s"
% self.component_list[i].component_addr)
with open(self.component_list[i].file_path, "rb") as component_file:
if not self.write_component_info(self.component_list[i], package_file):
UPDATE_LOGGER.print_log("write component info failed: %s"
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
return False
if not self.write_component(self.component_list[i], package_file):
UPDATE_LOGGER.print_log("write component failed: %s"
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
return False
try:
# Add descriptPackageId to package
package_file.seek(self.compinfo_offset)
package_file.write(self.head_list.describe_package_id)
except:
UPDATE_LOGGER.print_log(
"Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
try:
# Sign
self.sign_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN
package_file.seek(self.sign_offset)
sign_buffer = bytes(UPGRADE_SIGNATURE_LEN)
package_file.write(sign_buffer)
except:
UPDATE_LOGGER.print_log(
"Add Sign failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
UPDATE_LOGGER.print_log("Write update package complete")
return True

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -22,13 +22,13 @@ DATA_SIZE = 1374024 * 1024
class GigraphProcess(object):
def __init__(self, actions_list, src_sparse_image, tgt_sparse_image):
def __init__(self, actions_list, src_image, tgt_image):
self.actions_list = actions_list
if len(self.actions_list) == 0:
raise RuntimeError
self.size_of_source_list = 0
self.src_sparse_img_obj = src_sparse_image
self.tgt_sparse_img_obj = tgt_sparse_image
self.src_img_obj = src_image
self.tgt_img_obj = tgt_image
self.vertices = len(self.actions_list)
self.data_size = DATA_SIZE

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -22,14 +22,13 @@ from hashlib import sha256
from log_exception import UPDATE_LOGGER
from blocks_manager import BlocksManager
from utils import SPARSE_IMAGE_MAGIC
from utils import HEADER_INFO_FORMAT
from utils import HEADER_INFO_LEN
from utils import OPTIONS_MANAGER
from utils import EXTEND_VALUE
from utils import FILE_MAP_ZERO_KEY
from utils import FILE_MAP_NONZERO_KEY
from utils import FILE_MAP_COPY_KEY
from utils import MAX_BLOCKS_PER_GROUP
from utils import FORBIDEN_UPDATE_IMAGE_LIST
class FullUpdateImage:
@ -37,13 +36,16 @@ class FullUpdateImage:
Full image processing class
"""
def __init__(self, target_package_images_dir, full_img_list, verse_script,
full_image_path_list, no_zip=False):
self.__target_package_images_dir = target_package_images_dir
self.__full_img_list = full_img_list
self.__verse_script = verse_script
self.__full_image_path_list = full_image_path_list
self.__no_zip = no_zip
def __init__(self, target_package_images_dir,
full_img_list, full_img_name_list,
verse_script, full_image_path_list,
no_zip=False):
self.target_package_images_dir = target_package_images_dir
self.full_img_list = full_img_list
self.full_img_name_list = full_img_name_list
self.verse_script = verse_script
self.full_image_path_list = full_image_path_list
self.no_zip = no_zip
def update_full_image(self):
"""
@ -53,43 +55,35 @@ class FullUpdateImage:
"""
full_image_file_obj_list = []
full_image_content_len_list = []
for idx, each_name in enumerate(self.__full_img_list):
for idx, each_name in enumerate(self.full_img_list):
full_image_content = self.get_full_image_content(
self.__full_image_path_list[idx])
self.full_image_path_list[idx])
img_name = self.full_img_name_list[idx][:-4]
if full_image_content is False:
UPDATE_LOGGER.print_log(
"Get full image content failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False, False
each_img = tempfile.NamedTemporaryFile(
prefix="full_image%s" % each_name, mode='wb')
dir=self.target_package_images_dir,
prefix="full_image%s" % img_name, mode='wb')
each_img.write(full_image_content)
each_img.seek(0)
full_image_content_len_list.append(len(full_image_content))
full_image_file_obj_list.append(each_img)
UPDATE_LOGGER.print_log(
"Image %s full processing completed" % each_name)
if not self.__no_zip:
"Image %s full processing completed" % img_name)
if not self.no_zip:
# No zip mode (no script command)
if is_sparse_image(each_img.name):
sparse_image_write_cmd = \
self.__verse_script.sparse_image_write(each_name)
cmd = '%s_WRITE_FLAG%s' % (
each_name, sparse_image_write_cmd)
else:
raw_image_write_cmd = \
self.__verse_script.raw_image_write(
each_name, each_name)
cmd = '%s_WRITE_FLAG%s' % (
each_name, raw_image_write_cmd)
if each_name not in ("boot", "updater_boot",
"updater", "updater_b"):
self.__verse_script.add_command(
cmd=cmd)
image_write_cmd = \
self.verse_script.image_write(each_name, img_name, each_img.name)
cmd = '%s_WRITE_FLAG%s' % (each_name, image_write_cmd)
if each_name not in FORBIDEN_UPDATE_IMAGE_LIST:
self.verse_script.add_command(cmd=cmd)
UPDATE_LOGGER.print_log(
"All full image processing completed! image count: %d" %
len(self.__full_img_list))
len(self.full_img_list))
return full_image_content_len_list, full_image_file_obj_list
@staticmethod
@ -112,25 +106,6 @@ class FullUpdateImage:
return content
def is_sparse_image(img_path):
"""
Check whether the image is a sparse image.
:param img_path: image path
:return:
"""
with open(img_path, 'rb') as f_r:
image_content = f_r.read(HEADER_INFO_LEN)
try:
header_info = struct.unpack(HEADER_INFO_FORMAT, image_content)
except struct.error:
return False
is_sparse = IncUpdateImage.image_header_info_check(header_info)[-1]
if is_sparse:
UPDATE_LOGGER.print_log("Sparse image is not supported!")
raise RuntimeError
return is_sparse
class IncUpdateImage:
"""
Increment update image class
@ -143,6 +118,7 @@ class IncUpdateImage:
:param map_path: map file path
"""
self.image_path = image_path
self.map_path = map_path
self.offset_value_list = []
self.care_block_range = None
self.extended_range = None
@ -151,9 +127,9 @@ class IncUpdateImage:
self.offset_index = []
self.block_size = None
self.total_blocks = None
self.parse_sparse_image_file(image_path, map_path)
self.parse_raw_image_file(image_path, map_path)
def parse_sparse_image_file(self, image_path, map_path):
def parse_raw_image_file(self, image_path, map_path):
"""
Parse the .img file.
:param image_path: img file path
@ -409,54 +385,4 @@ class IncUpdateImage:
yield f_r.read(this_read * self.block_size)
else:
yield fill_data * (this_read * (self.block_size >> 2))
diff_value -= this_read
@staticmethod
def image_header_info_check(header_info):
"""
Check for new messages of the header_info image.
:param header_info: header_info
:return:
"""
image_flag = True
# Sparse mirroring header ID. The magic value is fixed to 0xED26FF3A.
magic_info = header_info[0]
# major version number
major_version = header_info[1]
# minor version number
minor_version = header_info[2]
# Length of the header information.
# The value is fixed to 28 characters.
header_info_size = header_info[3]
# Header information size of the chunk.
# The length is fixed to 12 characters.
chunk_header_info_size = header_info[4]
# Number of bytes of a block. The default size is 4096.
block_size = header_info[5]
# Total number of blocks contained in the current image
# (number of blocks in a non-sparse image)
total_blocks = header_info[6]
# Total number of chunks contained in the current image
total_chunks = header_info[7]
if magic_info != SPARSE_IMAGE_MAGIC:
UPDATE_LOGGER.print_log(
"SparseImage head Magic should be 0xED26FF3A!")
image_flag = False
if major_version != 1 or minor_version != 0:
UPDATE_LOGGER.print_log(
"SparseImage Only supported major version with "
"minor version 1.0!")
image_flag = False
if header_info_size != 28:
UPDATE_LOGGER.print_log(
"SparseImage header info size must be 28! size: %u." %
header_info_size)
image_flag = False
if chunk_header_info_size != 12:
UPDATE_LOGGER.print_log(
"SparseImage Chunk header size mast to be 12! size: %u." %
chunk_header_info_size)
image_flag = False
ret_args = [block_size, chunk_header_info_size, header_info_size,
magic_info, total_blocks, total_chunks, image_flag]
return ret_args
diff_value -= this_read

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -26,7 +26,6 @@ from decimal import Decimal
from log_exception import VendorExpandError
from log_exception import UPDATE_LOGGER
from utils import OPTIONS_MANAGER
from utils import IMAGE_FILE_MOUNT_TO_PARTITION_DICT
from utils import PARTITION_FILE
from utils import TWO_STEP
from utils import TOTAL_SCRIPT_FILE_NAME
@ -59,9 +58,15 @@ class Script:
def sha_check(self, *args, **kwargs):
raise VendorExpandError(type(self), 'sha_check')
def image_sha_check(self, *args, **kwargs):
raise VendorExpandError(type(self), 'image_sha_check')
def first_block_check(self, *args, **kwargs):
raise VendorExpandError(type(self), 'first_block_check')
def image_patch(self, *args, **kwargs):
raise VendorExpandError(type(self), 'image_patch')
def abort(self, *args, **kwargs):
raise VendorExpandError(type(self), 'abort')
@ -71,9 +76,6 @@ class Script:
def block_update(self, *args, **kwargs):
raise VendorExpandError(type(self), 'block_update')
def sparse_image_write(self, *args, **kwargs):
raise VendorExpandError(type(self), 'sparse_image_write')
def raw_image_write(self, *args, **kwargs):
raise VendorExpandError(type(self), 'raw_image_write')
@ -113,6 +115,22 @@ class VerseScript(Script):
expected_sha=expected_sha, partition=partition)
return cmd
def image_sha_check(self, partition, src_size, src_hash,
dst_size, dst_hash):
"""
Get the image_sha_check command.
:param ranges_str: ranges string
:param expected_sha: hash value
:param partition: image name
:return:
"""
cmd = ('image_sha_check("/{partition}", '
'"{src_size}", "{src_hash}", '
'"{dst_size}", "{dst_hash}")').format(
partition=partition, src_size=src_size, src_hash=src_hash,
dst_size=dst_size, dst_hash=dst_hash)
return cmd
def first_block_check(self, partition):
"""
Get the first_block_check command.
@ -146,6 +164,18 @@ class VerseScript(Script):
start_progress=start_progress, dur=dur)
return cmd
def image_patch(self, partition, src_size, src_hash, target_size, target_hash):
"""
Get the image_patch command.
:param partition: image name
:return:
"""
cmd = 'image_patch("/{partition}", "{src_size}", ' \
'"{src_hash}", "{target_size}", "{target_hash}", ' \
'"{partition}.patch.dat");\n'.format(partition=partition, src_size=src_size,
src_hash=src_hash, target_size=target_size, target_hash=target_hash)
return cmd
def block_update(self, partition):
"""
Get the block_update command.
@ -157,28 +187,18 @@ class VerseScript(Script):
'"{partition}.patch.dat");\n'.format(partition=partition)
return cmd
def sparse_image_write(self, partition):
"""
Get the sparse_image_write command.
:param partition: image name
:return:
"""
cmd = 'sparse_image_write("/%s");\n' % partition
return cmd
def image_write(self, partition, image_name, image_path):
return self.raw_image_write(partition, image_name)
def raw_image_write(self, partition, image_file_name):
def raw_image_write(self, partition, image_name):
"""
Get the raw_image_write command.
:param partition: image name
:param image_file_name: image file name
:return:
"""
if partition in IMAGE_FILE_MOUNT_TO_PARTITION_DICT.keys():
partition = IMAGE_FILE_MOUNT_TO_PARTITION_DICT.get(partition)
cmd = 'raw_image_write("/%s", "/%s");\n' % (partition, image_file_name)
cmd = 'raw_image_write("/%s", "/%s");\n' % (partition, image_name)
return cmd
def get_status(self):
"""
Get the get_status command.

View File

@ -126,7 +126,7 @@ class TestUpdatePackage(unittest.TestCase):
get_hash_content("non_existent.file", 'sha256')
re_str = get_hash_content("non_existent.file", 'test_sha')
check_re = re_str is None
check_re = re_str is False
self.assertEqual(check_re, True)
clear_resource()

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -66,10 +66,10 @@ class ActionInfo:
class TransfersManager(object):
def __init__(self, partition, tgt_sparse_img_obj, src_sparse_img_obj,
def __init__(self, partition, tgt_img_obj, src_img_obj,
disable_img_diff=False):
self.tgt_sparse_img_obj = tgt_sparse_img_obj
self.src_sparse_img_obj = src_sparse_img_obj
self.tgt_img_obj = tgt_img_obj
self.src_img_obj = src_img_obj
self.partition = partition
self.disable_img_diff = disable_img_diff
@ -84,7 +84,7 @@ class TransfersManager(object):
def arrange_source_file(self):
base_names = {}
version_patterns = {}
for file_name in self.src_sparse_img_obj.file_map.keys():
for file_name in self.src_img_obj.file_map.keys():
base_name, no_version_name = self.simplify_file_name(file_name)
base_names[base_name] = file_name
version_patterns[no_version_name] = file_name
@ -98,13 +98,13 @@ class TransfersManager(object):
max_size = -1
for tgt_file_name, tgt_blocks in \
self.tgt_sparse_img_obj.file_map.items():
self.tgt_img_obj.file_map.items():
if FILE_MAP_ZERO_KEY == tgt_file_name:
UPDATE_LOGGER.print_log("Apply ZERO type!")
self.action_file_list.append(
ActionInfo(
ActionType.ZERO, tgt_file_name, FILE_MAP_ZERO_KEY,
tgt_blocks, self.src_sparse_img_obj.
tgt_blocks, self.src_img_obj.
file_map.get(FILE_MAP_ZERO_KEY, None)))
continue
if FILE_MAP_COPY_KEY == tgt_file_name:
@ -113,12 +113,12 @@ class TransfersManager(object):
ActionInfo(ActionType.NEW, tgt_file_name,
None, tgt_blocks, None))
continue
if tgt_file_name in self.src_sparse_img_obj.file_map:
if tgt_file_name in self.src_img_obj.file_map:
UPDATE_LOGGER.print_log("Apply DIFF type!")
action_info = ActionInfo(
ActionType.DIFFERENT, tgt_file_name, tgt_file_name,
tgt_blocks,
self.src_sparse_img_obj.file_map[tgt_file_name])
self.src_img_obj.file_map[tgt_file_name])
max_size = action_info.get_max_block_number() \
if action_info.get_max_block_number() > \
max_size else max_size
@ -130,7 +130,7 @@ class TransfersManager(object):
action_info = ActionInfo(
ActionType.DIFFERENT, tgt_file_name, src_file_name,
tgt_blocks,
self.src_sparse_img_obj.file_map[src_file_name])
self.src_img_obj.file_map[src_file_name])
max_size = action_info.get_max_block_number() if \
action_info.get_max_block_number() > max_size else max_size
self.action_file_list.append(action_info)

184
unpack_updater_package.py Normal file
View File

@ -0,0 +1,184 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description : Unpack updater package
"""
import os
import struct
import time
from create_update_package import UPGRADE_FILE_HEADER_LEN
from create_update_package import UPGRADE_COMPINFO_SIZE
from create_update_package import UPGRADE_COMPINFO_SIZE_L2
from create_update_package import COMPONEBT_ADDR_SIZE
from create_update_package import COMPONEBT_ADDR_SIZE_L2
from create_update_package import UPGRADE_RESERVE_LEN
from create_update_package import UPGRADE_SIGNATURE_LEN
from log_exception import UPDATE_LOGGER
from utils import OPTIONS_MANAGER
COMPINFO_LEN_OFFSET = 178
COMPINFO_LEN_SIZE = 2
COMPONEBT_ADDR_OFFSET = UPGRADE_FILE_HEADER_LEN
COMPONENT_TYPE_OFFSET = COMPONEBT_ADDR_OFFSET + COMPONEBT_ADDR_SIZE + 4
COMPONENT_TYPE_OFFSET_L2 = COMPONEBT_ADDR_OFFSET + COMPONEBT_ADDR_SIZE_L2 + 4
COMPONENT_TYPE_SIZE = 1
COMPONENT_SIZE_OFFSET = 11
COMPONENT_SIZE_SIZE = 4
"""
Format
H: unsigned short
I: unsigned int
B: unsigned char
s: char[]
"""
COMPINFO_LEN_FMT = "H"
COMPONEBT_TYPE_FMT = "B"
COMPONENT_SIZE_FMT = "I"
class UnpackPackage(object):
"""
Unpack the update.bin file
"""
def __init__(self):
self.count = 0
self.component_offset = 0
self.save_path = None
if OPTIONS_MANAGER.not_l2:
self.compinfo_size = UPGRADE_COMPINFO_SIZE
self.type_offset = COMPONENT_TYPE_OFFSET
self.addr_size = COMPONEBT_ADDR_SIZE
else:
self.compinfo_size = UPGRADE_COMPINFO_SIZE_L2
self.type_offset = COMPONENT_TYPE_OFFSET_L2
self.addr_size = COMPONEBT_ADDR_SIZE_L2
self.addr_offset = COMPONEBT_ADDR_OFFSET
self.size_offset = self.type_offset + COMPONENT_SIZE_OFFSET
def check_args(self):
if not os.access(OPTIONS_MANAGER.unpack_package_path, os.R_OK) and \
notos.path.exists(self.save_path):
UPDATE_LOGGER.print_log(
"Access unpack_package_path fail! path: %s" % \
OPTIONS_MANAGER.unpack_package_path, UPDATE_LOGGER.ERROR_LOG)
return False
return True
def parse_package_file(self, package_file):
try:
package_file.seek(COMPINFO_LEN_OFFSET)
compinfo_len_buffer = package_file.read(COMPINFO_LEN_SIZE)
compinfo_len = struct.unpack(COMPINFO_LEN_FMT, compinfo_len_buffer)
except (struct.error, IOError):
return False
self.count = compinfo_len[0] // self.compinfo_size
self.component_offset = \
UPGRADE_FILE_HEADER_LEN + compinfo_len[0] + \
UPGRADE_RESERVE_LEN + UPGRADE_SIGNATURE_LEN
UPDATE_LOGGER.print_log(
"parse package file success! components: %d" % self.count)
return True
def parse_component(self, package_file):
try:
package_file.seek(self.addr_offset)
component_addr = package_file.read(self.addr_size)
component_addr = component_addr.split(b"\x00")[0].decode('utf-8')
package_file.seek(self.type_offset)
component_type_buffer = package_file.read(COMPONENT_TYPE_SIZE)
component_type = struct.unpack(COMPONEBT_TYPE_FMT, component_type_buffer)
package_file.seek(self.size_offset)
component_size_buffer = package_file.read(COMPONENT_SIZE_SIZE)
component_size = struct.unpack(COMPONENT_SIZE_FMT, component_size_buffer)
except (struct.error, IOError):
UPDATE_LOGGER.print_log(
"parse component failed!", UPDATE_LOGGER.ERROR_LOG)
return False, False, False
return component_addr, component_type[0], component_size[0]
def create_image_file(self, package_file):
component_name, component_type, component_size = \
self.parse_component(package_file)
if not component_name or not component_type or not component_size:
UPDATE_LOGGER.print_log(
"get component_info failed!", UPDATE_LOGGER.ERROR_LOG)
return False
component_name = component_name.strip('/')
if component_name == "version_list":
component_name = "VERSION.mbn"
elif component_name == "board_list":
component_name = "BOARD.list"
elif component_type == 0:
component_name = ''.join([component_name, '.img'])
elif component_type == 1:
component_name = ''.join([component_name, '.zip'])
image_file_path = os.path.join(self.save_path, component_name)
package_file.seek(self.component_offset)
UPDATE_LOGGER.print_log("component name: %s" % component_name)
UPDATE_LOGGER.print_log("component offset: %s" % self.component_offset)
UPDATE_LOGGER.print_log("component size: %s" % component_size)
with open(image_file_path, "wb") as image_file:
image_buffer = package_file.read(component_size)
image_file.write(image_buffer)
self.addr_offset += self.compinfo_size
self.type_offset += self.compinfo_size
self.size_offset += self.compinfo_size
self.component_offset += component_size
UPDATE_LOGGER.print_log("Create file: %s" % image_file_path)
return True
def unpack_package(self):
"""
Unpack the update.bin file
return: result
"""
UPDATE_LOGGER.print_log(
"Start unpack updater package: %s" % OPTIONS_MANAGER.unpack_package_path)
filename = ''.join(['unpack_result_', time.strftime("%H%M%S", time.localtime())])
self.save_path = os.path.join(OPTIONS_MANAGER.target_package, filename)
os.makedirs(self.save_path)
if not self.check_args():
UPDATE_LOGGER.print_log(
"check args failed!", UPDATE_LOGGER.ERROR_LOG)
return False
with open(OPTIONS_MANAGER.unpack_package_path, "rb") as package_file:
if not self.parse_package_file(package_file):
UPDATE_LOGGER.print_log(
"parse package file failed!", UPDATE_LOGGER.ERROR_LOG)
return False
for image_id in range(0, self.count):
UPDATE_LOGGER.print_log("Start to parse component_%d" % image_id)
if not self.create_image_file(package_file):
UPDATE_LOGGER.print_log(
"create image file failed!", UPDATE_LOGGER.ERROR_LOG)
return False
return True

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -22,9 +22,11 @@ import time
import collections as collect
import enum
import ctypes
import zipfile
from log_exception import UPDATE_LOGGER
from script_generator import create_script
from utils import sign_package
from utils import HASH_CONTENT_LEN_DICT
from utils import OPTIONS_MANAGER
from utils import REGISTER_SCRIPT_FILE_NAME
@ -37,14 +39,16 @@ from utils import TOTAL_SCRIPT_FILE_NAME
from utils import EXTEND_COMPONENT_LIST
from utils import LINUX_HASH_ALGORITHM_DICT
from utils import BUILD_TOOLS_FILE_NAME
from utils import get_lib_api
from utils import SIGN_PACKAGE_EVENT
from create_update_package import CreatePackage
from create_update_package import SIGN_ALGO_RSA
from create_update_package import SIGN_ALGO_PSS
IS_DEL = 0
SIGNING_LENGTH_256 = 256
DIGEST_LEN = 32
HASH_VALUE_MAX_LEN = 128
class SignMethod(enum.Enum):
RSA = 1
ECC = 2
@ -89,7 +93,8 @@ def create_update_bin():
:return update_bin_obj: Update file object.
If exception occurs, return False.
"""
update_bin_obj = tempfile.NamedTemporaryFile(prefix="update_bin-")
update_bin_obj = tempfile.NamedTemporaryFile(
dir=OPTIONS_MANAGER.update_package, prefix="update_bin-")
head_value_list = OPTIONS_MANAGER.head_info_list
component_dict = OPTIONS_MANAGER.component_info_dict
@ -123,36 +128,30 @@ def create_update_bin():
all_image_file_obj_list, component_dict)
save_patch = update_bin_obj.name.encode("utf-8")
sign_info = SignInfo()
if OPTIONS_MANAGER.private_key == ON_SERVER:
private_key = "./update_package.py"
else:
private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
lib = get_lib_api()
lib_l1 = get_lib_api(is_l2=False)
if OPTIONS_MANAGER.not_l2:
lib_l1.CreatePackageWithSignInfo(
ctypes.pointer(head_list), component_list, save_patch,
private_key, ctypes.pointer(sign_info))
offset = sign_info.sign_offset
hash_code = bytes(sign_info.hash_code).decode('ascii')
sign_algo = SIGN_ALGO_PSS
else:
lib.CreatePackage(
ctypes.pointer(head_list), component_list, save_patch,
OPTIONS_MANAGER.private_key.encode("utf-8"))
offset = 0
hash_code = b""
sign_algo = SIGN_ALGO_RSA
# create bin
package = CreatePackage(head_list, component_list, save_patch,
OPTIONS_MANAGER.private_key)
if not package.create_package():
UPDATE_LOGGER.print_log(
"Create update package .bin failed!", UPDATE_LOGGER.ERROR_LOG)
return False
if not package.sign(sign_algo):
UPDATE_LOGGER.print_log(".bin package signing failed", UPDATE_LOGGER.ERROR_LOG)
return False
if OPTIONS_MANAGER.private_key == ON_SERVER:
signing_package(update_bin_obj.name,
OPTIONS_MANAGER.hash_algorithm, hash_code=hash_code,
position=offset)
UPDATE_LOGGER.print_log(".bin package signing success!")
UPDATE_LOGGER.print_log(
"Create update package .bin complete! path: %s" % update_bin_obj.name)
return update_bin_obj, lib
return update_bin_obj
def get_component_list(all_image_file_obj_list, component_dict):
@ -188,7 +187,7 @@ def get_component_list(all_image_file_obj_list, component_dict):
file_path = \
all_image_file_obj_list[idx - len(extend_component_list)].name
digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
if digest is None:
if not digest:
return
if component is None:
component = copy.copy(COMPONENT_INFO_INNIT)
@ -316,34 +315,7 @@ def get_signing_from_server(package_path, hash_algorithm, hash_code=None):
return signing_content.encode()
def signing_package(package_path, hash_algorithm, hash_code=None,
position=0, package_type='.bin'):
"""
Update package signature.
:param package_path: update package file path
:param hash_algorithm: hash algorithm
:param position: signature write location
:param hash_code: hash code
:param package_type: the type of package,.bin/.zip
:return:
"""
try:
signing_content = get_signing_from_server(
package_path, hash_algorithm, hash_code)
if position != 0:
with open(package_path, mode='rb+') as f_r:
f_r.seek(position)
f_r.write(signing_content)
else:
with open(package_path, mode='ab') as f_w:
f_w.write(signing_content)
return True
except (OSError, TypeError):
UPDATE_LOGGER.print_log("%s package signing failed!" % package_type)
raise OSError
def create_build_tools_zip(lib):
def create_build_tools_zip():
"""
Create the update package file.
:param lib: lib object
@ -381,30 +353,24 @@ def create_build_tools_zip(lib):
"updater_binary file does not exist!path: %s" % update_exe_path,
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
file_obj = tempfile.NamedTemporaryFile(prefix="build_tools-")
file_save_patch = file_obj.name.encode("utf-8")
component_list[num].file_path = update_exe_path.encode("utf-8")
component_list[num].component_addr = \
UPDATE_EXE_FILE_NAME.encode("utf-8")
component_list[num + 1].file_path = \
total_script_file_obj.name.encode("utf-8")
component_list[num + 1].component_addr = \
TOTAL_SCRIPT_FILE_NAME.encode("utf-8")
file_obj = tempfile.NamedTemporaryFile(
dir=OPTIONS_MANAGER.update_package, prefix="build_tools-")
zip_file = zipfile.ZipFile(file_obj.name, 'w', zipfile.ZIP_DEFLATED)
# add opera_script to build_tools.zip
for key, value in opera_script_dict.items():
zip_file.write(key, value)
# add update_binary to build_tools.zip
zip_file.write(update_exe_path, UPDATE_EXE_FILE_NAME)
# add loadScript to build_tools.zip
zip_file.write(total_script_file_obj.name, TOTAL_SCRIPT_FILE_NAME)
if OPTIONS_MANAGER.register_script_file_obj is not None:
component_list[num + 2].file_path = \
register_script_file_obj.name.encode("utf-8")
component_list[num + 2].component_addr = \
REGISTER_SCRIPT_FILE_NAME.encode("utf-8")
zip_file.write(register_script_file_obj.name, REGISTER_SCRIPT_FILE_NAME)
zip_file.close()
if OPTIONS_MANAGER.private_key == ON_SERVER:
private_key = "./update_package.py"
else:
private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
lib.CreatePackage(
ctypes.pointer(head_list), component_list, file_save_patch,
private_key)
return file_obj
@ -420,84 +386,64 @@ def build_update_package(no_zip, update_package, prelude_script,
:param ending_script: ending object
:return: If exception occurs, return False.
"""
update_bin_obj, lib = create_update_bin()
OPTIONS_MANAGER.update_bin_obj = update_bin_obj
update_bin_obj = create_update_bin()
if update_bin_obj:
OPTIONS_MANAGER.update_bin_obj = update_bin_obj
else:
return False
update_file_name = ''.join(
[OPTIONS_MANAGER.product, '_ota_',
time.strftime("%H%M%S", time.localtime())])
if OPTIONS_MANAGER.sd_card :
type = "sd"
elif OPTIONS_MANAGER.source_package :
type = "diff"
else :
type = "full"
if OPTIONS_MANAGER.not_l2:
update_file_name = ''.join(
["updater_", OPTIONS_MANAGER.target_package_version.replace(" ", "_")])
else :
update_file_name = ''.join(
["updater_", type])
if not no_zip:
update_package_path = os.path.join(
update_package, '%s.zip' % update_file_name)
update_package, '%s_unsigned.zip' % update_file_name)
OPTIONS_MANAGER.update_package_file_path = update_package_path
create_script(prelude_script, verse_script,
refrain_script, ending_script)
build_tools_zip_obj = create_build_tools_zip(lib)
build_tools_zip_obj = create_build_tools_zip()
if build_tools_zip_obj is False:
UPDATE_LOGGER.print_log(
"Create build tools zip failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
head_list = PkgHeader()
if OPTIONS_MANAGER.signing_length != SIGNING_LENGTH_256:
# PKG_DIGEST_TYPE_SHA384 3,use sha384
head_list.digest_method = 3
else:
# PKG_DIGEST_TYPE_SHA256 2,use sha256
head_list.digest_method = 2
if OPTIONS_MANAGER.private_key == ON_SERVER:
head_list.sign_method = 0
else:
if OPTIONS_MANAGER.signing_algorithm == "ECC":
# signing algorithm use ECC
head_list.sign_method = SignMethod.ECC.value
else:
# signing algorithm use RSA
head_list.sign_method = SignMethod.RSA.value
head_list.pkg_type = 2
head_list.pkg_flags = 0
head_list.entry_count = 2
pkg_components = PkgComponent * 2
component_list = pkg_components()
component_list[0].file_path = \
OPTIONS_MANAGER.update_bin_obj.name.encode("utf-8")
component_list[0].component_addr = 'update.bin'.encode("utf-8")
component_list[1].file_path = \
OPTIONS_MANAGER.build_tools_zip_obj.name.encode("utf-8")
component_list[1].component_addr = \
BUILD_TOOLS_FILE_NAME.encode("utf-8")
sign_info = SignInfo()
if OPTIONS_MANAGER.private_key == ON_SERVER:
private_key = "./update_package.py"
else:
private_key = OPTIONS_MANAGER.private_key.encode("utf-8")
lib = get_lib_api()
lib_l1 = get_lib_api(is_l2=False)
if OPTIONS_MANAGER.not_l2:
lib_l1.CreatePackageWithSignInfo(
ctypes.pointer(head_list), component_list,
update_package_path.encode("utf-8"),
private_key, ctypes.pointer(sign_info))
else:
lib.CreatePackage(
ctypes.pointer(head_list), component_list,
update_package_path.encode("utf-8"),
OPTIONS_MANAGER.private_key.encode("utf-8"))
zip_file = zipfile.ZipFile(update_package_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
# add update.bin to update package
zip_file.write(OPTIONS_MANAGER.update_bin_obj.name, "update.bin")
# add build_tools.zip to update package
zip_file.write(OPTIONS_MANAGER.build_tools_zip_obj.name, BUILD_TOOLS_FILE_NAME)
zip_file.close()
if OPTIONS_MANAGER.private_key == ON_SERVER:
hash_code = "".join(["%x" % each for each in sign_info.hash_code])
signing_package(update_bin_obj.name,
OPTIONS_MANAGER.hash_algorithm, hash_code,
package_type='.zip')
signed_package = os.path.join(
update_package, "%s.zip" % update_file_name)
OPTIONS_MANAGER.signed_package = signed_package
UPDATE_LOGGER.print_log(".zip package signing success!")
UPDATE_LOGGER.print_log(
"Create update package .bin complete! path: %s" %
update_package_path)
sign_ota_package = \
OPTIONS_MANAGER.init.invoke_event(SIGN_PACKAGE_EVENT)
if sign_ota_package:
sign_result = sign_ota_package()
else:
sign_result = sign_package()
if not sign_result:
UPDATE_LOGGER.print_log("Sign ota package fail", UPDATE_LOGGER.ERROR_LOG)
return False
if os.path.exists(update_package_path):
os.remove(update_package_path)
else:
update_package_path = os.path.join(
update_package, '%s.bin' % update_file_name)
@ -522,10 +468,10 @@ def get_hash_content(file_path, hash_algorithm):
UPDATE_LOGGER.print_log(
"Unsupported hash algorithm! %s" % hash_algorithm,
log_type=UPDATE_LOGGER.ERROR_LOG)
return None
return False
if not os.path.exists(file_path):
UPDATE_LOGGER.print_log(
"%s failed!" % LINUX_HASH_ALGORITHM_DICT.get(hash_algorithm),
"%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
process_obj = subprocess.Popen(

148
utils.py
View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -16,17 +16,18 @@
"""
Description : Defining constants, common interface
"""
import argparse
import json
import os
import shutil
import tempfile
import xmltodict
import zipfile
from collections import OrderedDict
from collections import OrderedDict
from build_pkcs7 import sign_ota_package
from copy import copy
from ctypes import cdll
import xmltodict
from cryptography.hazmat.primitives import hashes
from log_exception import UPDATE_LOGGER
@ -45,6 +46,7 @@ XML_FILE_PATH = "updater_specified_config.xml"
SO_PATH = os.path.join(operation_path, 'lib/libpackage.so')
SO_PATH_L1 = os.path.join(operation_path, 'lib/libpackageL1.so')
DIFF_EXE_PATH = os.path.join(operation_path, 'lib/diff')
E2FSDROID_PATH = os.path.join(operation_path, 'lib/e2fsdroid')
MISC_INFO_PATH = "misc_info.txt"
VERSION_MBN_PATH = "VERSION.mbn"
BOARD_LIST_PATH = "BOARD.list"
@ -54,16 +56,6 @@ PARTITION_FILE = "partitions_file"
IGNORED_PARTITION_LIST = ['fastboot', 'boot', 'kernel', 'misc',
'updater', 'userdata']
SPARSE_IMAGE_MAGIC = 0xED26FF3A
# The related data is the original data of blocks set by chunk_size.
CHUNK_TYPE_RAW = 0xCAC1
# The related data is 4-byte fill data.
CHUNK_TYPE_FILL = 0xCAC2
# The related data is empty.
CHUNK_TYPE_DONT_CARE = 0xCAC3
# CRC32 block
CHUNK_TYPE_CRC32 = 0xCAC4
HASH_ALGORITHM_DICT = {'sha256': hashes.SHA256, 'sha384': hashes.SHA384}
LINUX_HASH_ALGORITHM_DICT = {'sha256': 'sha256sum', 'sha384': 'sha384sum'}
HASH_CONTENT_LEN_DICT = {'sha256': 64, 'sha384': 96}
@ -72,13 +64,6 @@ COMPONENT_INFO_INNIT = ['', '000', '00', '0', '0o00']
ON_SERVER = "ON_SERVER"
# The length of the header information of the sparse image is 28 bytes.
HEADER_INFO_FORMAT = "<I4H4I"
HEADER_INFO_LEN = 28
# The length of the chunk information of the sparse image is 12 bytes.
CHUNK_INFO_FORMAT = "<2H2I"
CHUNK_INFO_LEN = 12
EXTEND_VALUE = 512
FILE_MAP_ZERO_KEY = "__ZERO"
@ -89,9 +74,15 @@ MAX_BLOCKS_PER_GROUP = BLOCK_LIMIT = 1024
PER_BLOCK_SIZE = 4096
TWO_STEP = "updater"
SD_CARD_IMAGE_LIST = ["system", "vendor", "userdata"]
# Image file mount to partition, Correspondence dict.
IMAGE_FILE_MOUNT_TO_PARTITION_DICT = {"userdata": "data"}
VERSE_SCRIPT_EVENT = 0
INC_IMAGE_EVENT = 1
SIGN_PACKAGE_EVENT = 2
# Image file can not support update.
FORBIDEN_UPDATE_IMAGE_LIST = ["updater_boot", "updater_b", "ptable"]
# 1000000: max number of function recursion depth
MAXIMUM_RECURSION_DEPTH = 1000000
def singleton(cls):
@ -105,6 +96,30 @@ def singleton(cls):
return _singleton
class ExtInit:
"""
Init event for ext
"""
def __init__(self):
self.funs = []
def reg_event(self, evevt_id, funs):
self.funs.append([evevt_id, funs])
UPDATE_LOGGER.print_log(
'register event %s: %s' % (evevt_id, funs.__name__))
def invoke_event(self, evevt_id):
UPDATE_LOGGER.print_log(self.funs)
for event in self.funs:
funs = event[1]
if event[0] == evevt_id and funs is not None:
UPDATE_LOGGER.print_log(
'invoke event %s: %s' % (evevt_id, funs.__name__))
return funs
return False
@singleton
class OptionsManager:
"""
@ -112,6 +127,8 @@ class OptionsManager:
"""
def __init__(self):
self.init = ExtInit()
self.parser = argparse.ArgumentParser()
# Own parameters
self.product = None
@ -120,6 +137,7 @@ class OptionsManager:
self.source_package = None
self.target_package = None
self.update_package = None
self.unpack_package_path = None
self.no_zip = False
self.partition_file = None
self.signing_algorithm = None
@ -149,7 +167,9 @@ class OptionsManager:
self.head_info_list = []
self.component_info_dict = {}
self.full_img_list = []
self.full_img_name_list = []
self.incremental_img_list = []
self.incremental_img_name_list = []
self.target_package_version = None
self.source_package_version = None
self.two_step = False
@ -165,6 +185,8 @@ class OptionsManager:
self.incremental_content_len_list = []
self.incremental_image_file_obj_list = []
self.incremental_temp_file_obj_list = []
self.src_image = None
self.tgt_image = None
# Script parameters
self.opera_script_file_name_dict = {}
@ -178,12 +200,12 @@ class OptionsManager:
self.update_bin_obj = None
self.build_tools_zip_obj = None
self.update_package_file_path = None
self.signed_package = None
OPTIONS_MANAGER = OptionsManager()
def unzip_package(package_path, origin='target'):
unzip_package(package_path, origin='target'):
"""
Decompress the zip package.
:param package_path: zip package path
@ -230,7 +252,27 @@ def unzip_package(package_path, origin='target'):
return tmp_dir_obj, unzip_dir
def parse_update_config(xml_path):
split_img_name(image_path):
"""
Split the image name by image path
:return image name
"""
tmp_path = image_path
str_list = tmp_path.split("/")
return str_list[-1]
get_update_config_softversion(mbn_dir, head_info_dict):
soft_version_file = head_info_dict.get('softVersionFile')
if soft_version_file is not None:
mbn_path = os.path.join(mbn_dir, soft_version_file)
if os.path.exists(mbn_path):
with open(mbn_path, 'r') as mbn_file:
head_info_dict['info']["@softVersion"] = mbn_file.read()
parse_update_config(xml_path):
"""
Parse the XML configuration file.
:param xml_path: XML configuration file path
@ -250,6 +292,7 @@ def parse_update_config(xml_path):
return ret_params
xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
package_dict = xml_content_dict.get('package', {})
get_update_config_softversion(OPTIONS_MANAGER.target_package_dir, package_dict.get('head', {}))
head_dict = package_dict.get('head', {}).get('info')
package_version = head_dict.get("@softVersion")
# component
@ -282,29 +325,27 @@ def parse_update_config(xml_path):
if component['@compType'] == '0':
whole_list.append(component['@compAddr'])
OPTIONS_MANAGER.full_img_name_list.\
append(split_img_name(component['#text']))
tem_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
component.get("#text", None))
full_image_path_list.append(tem_path)
elif component['@compType'] == '1':
difference_list.append(component['@compAddr'])
OPTIONS_MANAGER.incremental_img_name_list.\
append(split_img_name(component['#text']))
if component['@compAddr'] == TWO_STEP:
two_step = True
UPDATE_LOGGER.print_log('XML file parsing completed!')
if OPTIONS_MANAGER.sd_card:
whole_list = SD_CARD_IMAGE_LIST
difference_list = []
full_image_path_list = \
[os.path.join(OPTIONS_MANAGER.target_package_dir, "%s.img" % each)
for each in SD_CARD_IMAGE_LIST]
ret_params = [head_list, component_dict,
whole_list, difference_list, package_version,
full_image_path_list, two_step]
return ret_params
def partitions_conversion(data):
partitions_conversion(data):
"""
Convert the start or length data in the partition table through
multiply 1024 * 1024 and return the data.
@ -319,7 +360,7 @@ def partitions_conversion(data):
return False
def parse_partition_file_xml(xml_path):
parse_partition_file_xml(xml_path):
"""
Parse the XML configuration file.
:param xml_path: XML configuration file path
@ -359,13 +400,14 @@ def parse_partition_file_xml(xml_path):
new_part_list.append(part_dict)
part_json = json.dumps(new_part_list)
part_json = '{"Partition": %s}' % part_json
file_obj = tempfile.NamedTemporaryFile(prefix="partition_file-", mode='wb')
file_obj = tempfile.NamedTemporaryFile(
dir=OPTIONS_MANAGER.target_package_dir, prefix="partition_file-", mode='wb')
file_obj.write(part_json.encode())
file_obj.seek(0)
return file_obj, partitions_list, partitions_file_path_list
def expand_component(component_dict):
expand_component(component_dict):
"""
Append components such as VERSION.mbn and board list.
:param component_dict: component information dict
@ -382,7 +424,7 @@ def expand_component(component_dict):
component_dict[each] = tmp_info_list
def clear_options():
clear_options():
"""
Clear OPTIONS_MANAGER.
"""
@ -451,7 +493,7 @@ def clear_options():
OPTIONS_MANAGER.update_package_file_path = None
def clear_resource(err_clear=False):
clear_resource(err_clear=False):
"""
Clear resources, close temporary files, and clear temporary paths.
:param err_clear: whether to clear errors
@ -491,7 +533,7 @@ def clear_resource(err_clear=False):
clear_options()
def clear_file_obj(err_clear):
clear_file_obj(err_clear):
"""
Clear resources and temporary file objects.
:param err_clear: whether to clear errors
@ -527,7 +569,7 @@ def clear_file_obj(err_clear):
UPDATE_LOGGER.print_log('Resource cleaning completed!')
def get_file_content(file_path, file_name=None):
get_file_content(file_path, file_name=None):
"""
Read the file content.
:param file_path: file path
@ -546,7 +588,7 @@ def get_file_content(file_path, file_name=None):
return file_content
def get_update_info():
get_update_info():
"""
Parse the configuration file to obtain the update information.
:return: update information if any; false otherwise.
@ -605,20 +647,8 @@ def get_update_info():
return True
def get_lib_api(is_l2=True):
"""
Get the so API.
:param is_l2: Is it L2 so
:return:
"""
if is_l2:
so_path = SO_PATH
else:
so_path = SO_PATH_L1
if not os.path.exists(so_path):
UPDATE_LOGGER.print_log(
"So does not exist! so path: %s" % so_path,
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
lib = cdll.LoadLibrary(so_path)
return lib
sign_package():
return sign_ota_package(
OPTIONS_MANAGER.update_package_file_path,
OPTIONS_MANAGER.signed_package,
OPTIONS_MANAGER.private_key)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
@ -25,7 +25,7 @@ from utils import OPTIONS_MANAGER
from utils import REGISTER_SCRIPT_FILE_NAME
def create_vendor_script_class():
create_vendor_script_class():
"""
Obtain the extended script objects of the vendor. By default,
the return value is [None] * len(SCRIPT_KEY_LIST).
@ -47,27 +47,27 @@ def create_vendor_script_class():
class VendorPreludeScript(PreludeScript):
def __init__(self):
__init__(self):
super().__init__()
class VendorVerseScript(VerseScript):
def __init__(self):
__init__(self):
super().__init__()
class VendorRefrainScript(RefrainScript):
def __init__(self):
__init__(self):
super().__init__()
class VendorEndingScript(EndingScript):
def __init__(self):
__init__(self):
super().__init__()
class ExtensionCmdRegister:
def __init__(self):
__init__(self):
"""
ExtensionCmdRegister for vendor extended command registration.
self.__cmd_in_so_dict needs the dict of extension command and
@ -80,7 +80,7 @@ class ExtensionCmdRegister:
"""
self.__cmd_in_so_dict = {}
def generate_register_cmd_script(self):
generate_register_cmd_script(self):
"""
Generate the register script.
"""