update_packaging_tools/create_update_package.py
unknown 77f15d35dc codex
Signed-off-by: unknown <zhangchun39@huawei.com>
2023-08-17 15:26:53 +08:00

368 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description : Generate the update.bin file
"""
import os
import struct
import hashlib
import subprocess
from log_exception import UPDATE_LOGGER
from utils import OPTIONS_MANAGER
from create_hashdata import HashType
from create_hashdata import CreateHash
from create_hashdata import HASH_BLOCK_SIZE
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
UPGRADE_FILE_HEADER_LEN = 180
UPGRADE_RESERVE_LEN = 16
SIGN_SHA256_LEN = 256
SIGN_SHA384_LEN = 384
UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN
TLV_SIZE = 4
UPGRADE_PKG_HEADER_SIZE = 136
UPGRADE_PKG_TIME_SIZE = 32
UPGRADE_COMPINFO_SIZE = 71
UPGRADE_COMPINFO_SIZE_L2 = 87
COMPONENT_ADDR_SIZE = 16
COMPONENT_ADDR_SIZE_L2 = 32
COMPONENT_INFO_FMT_SIZE = 5
COMPONENT_VERSION_SIZE = 10
COMPONENT_SIZE_FMT_SIZE = 8
COMPONENT_DIGEST_SIZE = 32
BLOCK_SIZE = 8192
HEADER_TLV_TYPE = 0x11
HEADER_TLV_TYPE_L2 = 0x01
# signature algorithm
SIGN_ALGO_RSA = "SHA256withRSA"
SIGN_ALGO_PSS = "SHA256withPSS"
"""
Format
H: unsigned short
I: unsigned int
B: unsigned char
s: char[]
"""
TLV_FMT = "2H"
UPGRADE_PKG_HEADER_FMT = "2I64s64s"
UPGRADE_PKG_TIME_FMT = "16s16s"
COMPONENT_INFO_FMT = "H3B"
COMPONENT_SIZE_FMT = "iI"
class CreatePackage(object):
"""
Create the update.bin file
"""
def __init__(self, head_list, component_list, save_path, key_path):
self.head_list = head_list
self.component_list = component_list
self.save_path = save_path
self.key_path = key_path
self.compinfo_offset = 0
self.component_offset = 0
self.sign_offset = 0
self.hash_info_offset = 0
if OPTIONS_MANAGER.not_l2:
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE
self.header_tlv_type = HEADER_TLV_TYPE
else:
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2
self.header_tlv_type = HEADER_TLV_TYPE_L2
def verify_param(self):
if self.head_list is None or self.component_list is None or \
self.save_path is None or self.key_path is None:
UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG)
return False
if os.path.isdir(self.key_path):
UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG)
return False
if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0:
UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG)
return False
return True
def write_pkginfo(self, package_file):
try:
# Type is 1 for package header in TLV format
header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE)
pkg_info_length = \
UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \
UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \
self.upgrade_compinfo_size * self.head_list.entry_count
upgrade_pkg_header = struct.pack(
UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version,
self.head_list.product_update_id, self.head_list.software_version)
# Type is 2 for time in TLV format
time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE)
upgrade_pkg_time = struct.pack(
UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time)
# Type is 5 for component in TLV format
component_tlv = struct.pack(
TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count)
except struct.error:
UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# write pkginfo
pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv
try:
package_file.write(pkginfo)
except IOError:
UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
UPDATE_LOGGER.print_log("Write package header complete")
return True
def write_component_info(self, component, package_file):
UPDATE_LOGGER.print_log("component information StartOffset:%s"\
% self.compinfo_offset)
if OPTIONS_MANAGER.not_l2:
component_addr_size = COMPONENT_ADDR_SIZE
else:
component_addr_size = COMPONENT_ADDR_SIZE_L2
try:
package_file.seek(self.compinfo_offset)
package_file.write(component.component_addr)
self.compinfo_offset += component_addr_size
package_file.seek(self.compinfo_offset)
component_info = struct.pack(
COMPONENT_INFO_FMT, component.id, component.res_type,
component.flags, component.type)
package_file.write(component_info)
self.compinfo_offset += COMPONENT_INFO_FMT_SIZE
package_file.seek(self.compinfo_offset)
package_file.write(component.version)
self.compinfo_offset += COMPONENT_VERSION_SIZE
package_file.seek(self.compinfo_offset)
component_size = struct.pack(
COMPONENT_SIZE_FMT, component.size, component.original_size)
package_file.write(component_size)
self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE
package_file.seek(self.compinfo_offset)
package_file.write(component.digest)
self.compinfo_offset += COMPONENT_DIGEST_SIZE
except (struct.error, IOError):
return False
return True
def write_component(self, component, package_file):
UPDATE_LOGGER.print_log("Add component to package StartOffset:%s"\
% self.component_offset)
try:
with open(component.file_path, "rb") as component_file:
component_data = component_file.read()
package_file.seek(self.component_offset)
package_file.write(component_data)
component_len = len(component_data)
self.component_offset += component_len
except IOError:
return False
UPDATE_LOGGER.print_log("Write component complete ComponentSize:%s"\
% component_len)
return True
def calculate_hash(self, package_file):
hash_sha256 = hashlib.sha256()
remain_len = self.component_offset
package_file.seek(0)
while remain_len > BLOCK_SIZE:
hash_sha256.update(package_file.read(BLOCK_SIZE))
remain_len -= BLOCK_SIZE
if remain_len > 0:
hash_sha256.update(package_file.read(remain_len))
return hash_sha256.digest()
def calculate_header_hash(self, package_file):
hash_sha256 = hashlib.sha256()
remain_len = self.hash_info_offset
package_file.seek(0)
while remain_len > BLOCK_SIZE:
hash_sha256.update(package_file.read(BLOCK_SIZE))
remain_len -= BLOCK_SIZE
if remain_len > 0:
hash_sha256.update(package_file.read(remain_len))
return hash_sha256.digest()
def sign_digest_with_pss(self, digest):
try:
with open(self.key_path, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
pad = padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH)
signature = private_key.sign(digest, pad, hashes.SHA256())
except (OSError, ValueError):
return False
return signature
def sign_digest(self, digest):
try:
with open(self.key_path, 'rb') as f_r:
key_data = f_r.read()
private_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend())
signature = private_key.sign(digest, padding.PKCS1v15(), hashes.SHA256())
except (OSError, ValueError):
return False
return signature
def sign(self, sign_algo):
with open(self.save_path, "rb+") as package_file:
# calculate hash for .bin package
digest = self.calculate_hash(package_file)
if not digest:
UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# sign .bin package
if sign_algo == SIGN_ALGO_RSA:
signature = self.sign_digest(digest)
elif sign_algo == SIGN_ALGO_PSS:
signature = self.sign_digest_with_pss(digest)
else:
UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
if not signature:
UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
if len(signature) == SIGN_SHA384_LEN:
self.sign_offset += SIGN_SHA256_LEN
# write signed .bin package
package_file.seek(self.sign_offset)
package_file.write(signature)
UPDATE_LOGGER.print_log(
".bin package signing success! SignOffset: %s" % self.sign_offset)
return True
def sign_header(self, sign_algo, hash_check_data, package_file):
# calculate hash for .bin package
digest = self.calculate_header_hash(package_file)
if not digest:
UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# sign .bin header
if sign_algo == SIGN_ALGO_RSA:
signature = self.sign_digest(digest)
elif sign_algo == SIGN_ALGO_PSS:
signature = self.sign_digest_with_pss(digest)
else:
UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
if not signature:
UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# write signed .bin header
hash_check_data.write_signdata(signature)
package_file.seek(self.hash_info_offset)
package_file.write(hash_check_data.signdata)
self.hash_info_offset += len(hash_check_data.signdata)
UPDATE_LOGGER.print_log(
".bin package header signing success! SignOffset: %s" % self.hash_info_offset)
return True
def create_package(self):
"""
Create the update.bin file
return: update package creation result
"""
if not self.verify_param():
UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG)
return False
hash_check_data = CreateHash(HashType.SHA256, self.head_list.entry_count)
hash_check_data.write_hashinfo()
package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755)
with os.fdopen(package_fd, "wb+") as package_file:
# Add information to package
if not self.write_pkginfo(package_file):
UPDATE_LOGGER.print_log("Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
# Add component to package
self.compinfo_offset = UPGRADE_FILE_HEADER_LEN
self.component_offset = UPGRADE_FILE_HEADER_LEN + \
self.head_list.entry_count * self.upgrade_compinfo_size + \
UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN
for i in range(0, self.head_list.entry_count):
UPDATE_LOGGER.print_log("Add component %s" % self.component_list[i].component_addr)
if not self.write_component_info(self.component_list[i], package_file):
UPDATE_LOGGER.print_log("write component info failed: %s"
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
return False
if OPTIONS_MANAGER.sd_card and (not hash_check_data.write_component_hash_data(self.component_list[i])):
UPDATE_LOGGER.print_log("write component hash data failed: %s"
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
return False
try:
# Add descriptPackageId to package
package_file.seek(self.compinfo_offset)
package_file.write(
(self.head_list.describe_package_id.decode().ljust(UPGRADE_RESERVE_LEN, "\0")).encode())
except IOError:
UPDATE_LOGGER.print_log("Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
self.hash_info_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN
if OPTIONS_MANAGER.sd_card:
try:
# Add hash check data to package
hash_check_data.write_hashdata()
package_file.seek(self.hash_info_offset)
package_file.write(hash_check_data.hashinfo_value + hash_check_data.hashdata)
self.hash_info_offset += len(hash_check_data.hashinfo_value + hash_check_data.hashdata)
except IOError:
UPDATE_LOGGER.print_log("Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
return False
self.sign_header(SIGN_ALGO_RSA, hash_check_data, package_file)
self.component_offset = self.hash_info_offset
for i in range(0, self.head_list.entry_count):
if not self.write_component(self.component_list[i], package_file):
UPDATE_LOGGER.print_log("write component failed: %s"
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
return False
UPDATE_LOGGER.print_log("Write update package complete")
return True