mirror of
https://gitee.com/openharmony/update_packaging_tools
synced 2024-11-26 16:31:21 +00:00
77f15d35dc
Signed-off-by: unknown <zhangchun39@huawei.com>
368 lines
15 KiB
Python
368 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2022 Huawei Device Co., Ltd.
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Description : Generate the update.bin file
|
|
"""
|
|
import os
|
|
import struct
|
|
import hashlib
|
|
import subprocess
|
|
from log_exception import UPDATE_LOGGER
|
|
from utils import OPTIONS_MANAGER
|
|
from create_hashdata import HashType
|
|
from create_hashdata import CreateHash
|
|
from create_hashdata import HASH_BLOCK_SIZE
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
UPGRADE_FILE_HEADER_LEN = 180
|
|
UPGRADE_RESERVE_LEN = 16
|
|
SIGN_SHA256_LEN = 256
|
|
SIGN_SHA384_LEN = 384
|
|
UPGRADE_SIGNATURE_LEN = SIGN_SHA256_LEN + SIGN_SHA384_LEN
|
|
TLV_SIZE = 4
|
|
UPGRADE_PKG_HEADER_SIZE = 136
|
|
UPGRADE_PKG_TIME_SIZE = 32
|
|
UPGRADE_COMPINFO_SIZE = 71
|
|
UPGRADE_COMPINFO_SIZE_L2 = 87
|
|
COMPONENT_ADDR_SIZE = 16
|
|
COMPONENT_ADDR_SIZE_L2 = 32
|
|
COMPONENT_INFO_FMT_SIZE = 5
|
|
COMPONENT_VERSION_SIZE = 10
|
|
COMPONENT_SIZE_FMT_SIZE = 8
|
|
COMPONENT_DIGEST_SIZE = 32
|
|
BLOCK_SIZE = 8192
|
|
HEADER_TLV_TYPE = 0x11
|
|
HEADER_TLV_TYPE_L2 = 0x01
|
|
# signature algorithm
|
|
SIGN_ALGO_RSA = "SHA256withRSA"
|
|
SIGN_ALGO_PSS = "SHA256withPSS"
|
|
|
|
"""
|
|
Format
|
|
H: unsigned short
|
|
I: unsigned int
|
|
B: unsigned char
|
|
s: char[]
|
|
"""
|
|
TLV_FMT = "2H"
|
|
UPGRADE_PKG_HEADER_FMT = "2I64s64s"
|
|
UPGRADE_PKG_TIME_FMT = "16s16s"
|
|
COMPONENT_INFO_FMT = "H3B"
|
|
COMPONENT_SIZE_FMT = "iI"
|
|
|
|
|
|
class CreatePackage(object):
|
|
"""
|
|
Create the update.bin file
|
|
"""
|
|
|
|
def __init__(self, head_list, component_list, save_path, key_path):
|
|
self.head_list = head_list
|
|
self.component_list = component_list
|
|
self.save_path = save_path
|
|
self.key_path = key_path
|
|
self.compinfo_offset = 0
|
|
self.component_offset = 0
|
|
self.sign_offset = 0
|
|
self.hash_info_offset = 0
|
|
|
|
if OPTIONS_MANAGER.not_l2:
|
|
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE
|
|
self.header_tlv_type = HEADER_TLV_TYPE
|
|
else:
|
|
self.upgrade_compinfo_size = UPGRADE_COMPINFO_SIZE_L2
|
|
self.header_tlv_type = HEADER_TLV_TYPE_L2
|
|
|
|
def verify_param(self):
|
|
if self.head_list is None or self.component_list is None or \
|
|
self.save_path is None or self.key_path is None:
|
|
UPDATE_LOGGER.print_log("Check param failed!", UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
if os.path.isdir(self.key_path):
|
|
UPDATE_LOGGER.print_log("Invalid keyname", UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
if self.head_list.__sizeof__() <= 0 or self.component_list.__sizeof__() <= 0:
|
|
UPDATE_LOGGER.print_log("Invalid param", UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
return True
|
|
|
|
def write_pkginfo(self, package_file):
|
|
try:
|
|
# Type is 1 for package header in TLV format
|
|
header_tlv = struct.pack(TLV_FMT, self.header_tlv_type, UPGRADE_PKG_HEADER_SIZE)
|
|
pkg_info_length = \
|
|
UPGRADE_RESERVE_LEN + TLV_SIZE + TLV_SIZE + TLV_SIZE + \
|
|
UPGRADE_PKG_HEADER_SIZE + UPGRADE_PKG_TIME_SIZE + \
|
|
self.upgrade_compinfo_size * self.head_list.entry_count
|
|
upgrade_pkg_header = struct.pack(
|
|
UPGRADE_PKG_HEADER_FMT, pkg_info_length, self.head_list.update_file_version,
|
|
self.head_list.product_update_id, self.head_list.software_version)
|
|
|
|
# Type is 2 for time in TLV format
|
|
time_tlv = struct.pack(TLV_FMT, 0x02, UPGRADE_PKG_TIME_SIZE)
|
|
upgrade_pkg_time = struct.pack(
|
|
UPGRADE_PKG_TIME_FMT, self.head_list.date, self.head_list.time)
|
|
|
|
# Type is 5 for component in TLV format
|
|
component_tlv = struct.pack(
|
|
TLV_FMT, 0x05, self.upgrade_compinfo_size * self.head_list.entry_count)
|
|
except struct.error:
|
|
UPDATE_LOGGER.print_log("Pack fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
# write pkginfo
|
|
pkginfo = header_tlv + upgrade_pkg_header + time_tlv + upgrade_pkg_time + component_tlv
|
|
try:
|
|
package_file.write(pkginfo)
|
|
except IOError:
|
|
UPDATE_LOGGER.print_log("write fail!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
UPDATE_LOGGER.print_log("Write package header complete")
|
|
return True
|
|
|
|
def write_component_info(self, component, package_file):
|
|
UPDATE_LOGGER.print_log("component information StartOffset:%s"\
|
|
% self.compinfo_offset)
|
|
if OPTIONS_MANAGER.not_l2:
|
|
component_addr_size = COMPONENT_ADDR_SIZE
|
|
else:
|
|
component_addr_size = COMPONENT_ADDR_SIZE_L2
|
|
|
|
try:
|
|
package_file.seek(self.compinfo_offset)
|
|
package_file.write(component.component_addr)
|
|
self.compinfo_offset += component_addr_size
|
|
|
|
package_file.seek(self.compinfo_offset)
|
|
component_info = struct.pack(
|
|
COMPONENT_INFO_FMT, component.id, component.res_type,
|
|
component.flags, component.type)
|
|
package_file.write(component_info)
|
|
self.compinfo_offset += COMPONENT_INFO_FMT_SIZE
|
|
|
|
package_file.seek(self.compinfo_offset)
|
|
package_file.write(component.version)
|
|
self.compinfo_offset += COMPONENT_VERSION_SIZE
|
|
|
|
package_file.seek(self.compinfo_offset)
|
|
component_size = struct.pack(
|
|
COMPONENT_SIZE_FMT, component.size, component.original_size)
|
|
package_file.write(component_size)
|
|
self.compinfo_offset += COMPONENT_SIZE_FMT_SIZE
|
|
|
|
package_file.seek(self.compinfo_offset)
|
|
package_file.write(component.digest)
|
|
self.compinfo_offset += COMPONENT_DIGEST_SIZE
|
|
except (struct.error, IOError):
|
|
return False
|
|
return True
|
|
|
|
def write_component(self, component, package_file):
|
|
UPDATE_LOGGER.print_log("Add component to package StartOffset:%s"\
|
|
% self.component_offset)
|
|
try:
|
|
with open(component.file_path, "rb") as component_file:
|
|
component_data = component_file.read()
|
|
package_file.seek(self.component_offset)
|
|
package_file.write(component_data)
|
|
component_len = len(component_data)
|
|
self.component_offset += component_len
|
|
except IOError:
|
|
return False
|
|
UPDATE_LOGGER.print_log("Write component complete ComponentSize:%s"\
|
|
% component_len)
|
|
return True
|
|
|
|
def calculate_hash(self, package_file):
|
|
hash_sha256 = hashlib.sha256()
|
|
remain_len = self.component_offset
|
|
|
|
package_file.seek(0)
|
|
while remain_len > BLOCK_SIZE:
|
|
hash_sha256.update(package_file.read(BLOCK_SIZE))
|
|
remain_len -= BLOCK_SIZE
|
|
if remain_len > 0:
|
|
hash_sha256.update(package_file.read(remain_len))
|
|
return hash_sha256.digest()
|
|
|
|
def calculate_header_hash(self, package_file):
|
|
hash_sha256 = hashlib.sha256()
|
|
remain_len = self.hash_info_offset
|
|
|
|
package_file.seek(0)
|
|
while remain_len > BLOCK_SIZE:
|
|
hash_sha256.update(package_file.read(BLOCK_SIZE))
|
|
remain_len -= BLOCK_SIZE
|
|
if remain_len > 0:
|
|
hash_sha256.update(package_file.read(remain_len))
|
|
return hash_sha256.digest()
|
|
|
|
def sign_digest_with_pss(self, digest):
|
|
try:
|
|
with open(self.key_path, 'rb') as f_r:
|
|
key_data = f_r.read()
|
|
private_key = serialization.load_pem_private_key(
|
|
key_data,
|
|
password=None,
|
|
backend=default_backend())
|
|
|
|
pad = padding.PSS(
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
salt_length=padding.PSS.MAX_LENGTH)
|
|
|
|
signature = private_key.sign(digest, pad, hashes.SHA256())
|
|
except (OSError, ValueError):
|
|
return False
|
|
return signature
|
|
|
|
def sign_digest(self, digest):
|
|
try:
|
|
with open(self.key_path, 'rb') as f_r:
|
|
key_data = f_r.read()
|
|
private_key = serialization.load_pem_private_key(
|
|
key_data,
|
|
password=None,
|
|
backend=default_backend())
|
|
signature = private_key.sign(digest, padding.PKCS1v15(), hashes.SHA256())
|
|
except (OSError, ValueError):
|
|
return False
|
|
return signature
|
|
|
|
def sign(self, sign_algo):
|
|
with open(self.save_path, "rb+") as package_file:
|
|
# calculate hash for .bin package
|
|
digest = self.calculate_hash(package_file)
|
|
if not digest:
|
|
UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
|
|
log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
# sign .bin package
|
|
if sign_algo == SIGN_ALGO_RSA:
|
|
signature = self.sign_digest(digest)
|
|
elif sign_algo == SIGN_ALGO_PSS:
|
|
signature = self.sign_digest_with_pss(digest)
|
|
else:
|
|
UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
if not signature:
|
|
UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
if len(signature) == SIGN_SHA384_LEN:
|
|
self.sign_offset += SIGN_SHA256_LEN
|
|
|
|
# write signed .bin package
|
|
package_file.seek(self.sign_offset)
|
|
package_file.write(signature)
|
|
UPDATE_LOGGER.print_log(
|
|
".bin package signing success! SignOffset: %s" % self.sign_offset)
|
|
return True
|
|
|
|
def sign_header(self, sign_algo, hash_check_data, package_file):
|
|
# calculate hash for .bin package
|
|
digest = self.calculate_header_hash(package_file)
|
|
if not digest:
|
|
UPDATE_LOGGER.print_log("calculate hash for .bin package failed",
|
|
log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
# sign .bin header
|
|
if sign_algo == SIGN_ALGO_RSA:
|
|
signature = self.sign_digest(digest)
|
|
elif sign_algo == SIGN_ALGO_PSS:
|
|
signature = self.sign_digest_with_pss(digest)
|
|
else:
|
|
UPDATE_LOGGER.print_log("invalid sign_algo!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
if not signature:
|
|
UPDATE_LOGGER.print_log("sign .bin package failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
# write signed .bin header
|
|
hash_check_data.write_signdata(signature)
|
|
package_file.seek(self.hash_info_offset)
|
|
package_file.write(hash_check_data.signdata)
|
|
self.hash_info_offset += len(hash_check_data.signdata)
|
|
UPDATE_LOGGER.print_log(
|
|
".bin package header signing success! SignOffset: %s" % self.hash_info_offset)
|
|
return True
|
|
|
|
def create_package(self):
|
|
"""
|
|
Create the update.bin file
|
|
return: update package creation result
|
|
"""
|
|
if not self.verify_param():
|
|
UPDATE_LOGGER.print_log("verify param failed!", UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
hash_check_data = CreateHash(HashType.SHA256, self.head_list.entry_count)
|
|
hash_check_data.write_hashinfo()
|
|
package_fd = os.open(self.save_path, os.O_RDWR | os.O_CREAT, 0o755)
|
|
with os.fdopen(package_fd, "wb+") as package_file:
|
|
# Add information to package
|
|
if not self.write_pkginfo(package_file):
|
|
UPDATE_LOGGER.print_log("Write pkginfo failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
# Add component to package
|
|
self.compinfo_offset = UPGRADE_FILE_HEADER_LEN
|
|
self.component_offset = UPGRADE_FILE_HEADER_LEN + \
|
|
self.head_list.entry_count * self.upgrade_compinfo_size + \
|
|
UPGRADE_RESERVE_LEN + SIGN_SHA256_LEN + SIGN_SHA384_LEN
|
|
for i in range(0, self.head_list.entry_count):
|
|
UPDATE_LOGGER.print_log("Add component %s" % self.component_list[i].component_addr)
|
|
if not self.write_component_info(self.component_list[i], package_file):
|
|
UPDATE_LOGGER.print_log("write component info failed: %s"
|
|
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
if OPTIONS_MANAGER.sd_card and (not hash_check_data.write_component_hash_data(self.component_list[i])):
|
|
UPDATE_LOGGER.print_log("write component hash data failed: %s"
|
|
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
|
|
try:
|
|
# Add descriptPackageId to package
|
|
package_file.seek(self.compinfo_offset)
|
|
package_file.write(
|
|
(self.head_list.describe_package_id.decode().ljust(UPGRADE_RESERVE_LEN, "\0")).encode())
|
|
except IOError:
|
|
UPDATE_LOGGER.print_log("Add descriptPackageId failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
self.hash_info_offset = self.compinfo_offset + UPGRADE_RESERVE_LEN
|
|
if OPTIONS_MANAGER.sd_card:
|
|
try:
|
|
# Add hash check data to package
|
|
hash_check_data.write_hashdata()
|
|
package_file.seek(self.hash_info_offset)
|
|
package_file.write(hash_check_data.hashinfo_value + hash_check_data.hashdata)
|
|
self.hash_info_offset += len(hash_check_data.hashinfo_value + hash_check_data.hashdata)
|
|
|
|
except IOError:
|
|
UPDATE_LOGGER.print_log("Add hash check data failed!", log_type=UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
self.sign_header(SIGN_ALGO_RSA, hash_check_data, package_file)
|
|
self.component_offset = self.hash_info_offset
|
|
for i in range(0, self.head_list.entry_count):
|
|
if not self.write_component(self.component_list[i], package_file):
|
|
UPDATE_LOGGER.print_log("write component failed: %s"
|
|
% self.component_list[i].component_addr, UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
UPDATE_LOGGER.print_log("Write update package complete")
|
|
return True |