mirror of
https://gitee.com/openharmony/update_packaging_tools
synced 2024-11-23 06:40:03 +00:00
6e2fcafe93
Signed-off-by: unknown <zhangchun39@huawei.com>
420 lines
14 KiB
Python
420 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2021 Huawei Device Co., Ltd.
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Description : Create script file for updater
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from decimal import getcontext
|
|
from decimal import Decimal
|
|
|
|
from log_exception import VendorExpandError
|
|
from log_exception import UPDATE_LOGGER
|
|
from utils import OPTIONS_MANAGER
|
|
from utils import PARTITION_FILE
|
|
from utils import TOTAL_SCRIPT_FILE_NAME
|
|
from utils import SCRIPT_FILE_NAME
|
|
from utils import SCRIPT_KEY_LIST
|
|
from utils import UPDATE_BIN_FILE_NAME
|
|
|
|
|
|
class Script:
|
|
|
|
def __init__(self):
|
|
self.script = []
|
|
self.version = 0
|
|
self.info = {}
|
|
|
|
def add_command(self, cmd=None):
|
|
"""
|
|
Add command content to the script.
|
|
:param cmd: command content
|
|
:return:
|
|
"""
|
|
self.script.append(cmd)
|
|
|
|
def get_script(self):
|
|
"""
|
|
Get the script list.
|
|
:return: script list
|
|
"""
|
|
return self.script
|
|
|
|
def sha_check(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'sha_check')
|
|
|
|
def image_sha_check(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'image_sha_check')
|
|
|
|
def first_block_check(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'first_block_check')
|
|
|
|
def image_patch(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'image_patch')
|
|
|
|
def abort(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'abort')
|
|
|
|
def show_progress(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'show_progress')
|
|
|
|
def block_update(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'block_update')
|
|
|
|
def raw_image_write(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'raw_image_write')
|
|
|
|
def get_status(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'get_status')
|
|
|
|
def set_status(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'set_status')
|
|
|
|
def reboot_now(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'reboot_now')
|
|
|
|
def updater_partitions(self, *args, **kwargs):
|
|
raise VendorExpandError(type(self), 'updater_partitions')
|
|
|
|
|
|
class PreludeScript(Script):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
|
|
class VerseScript(Script):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def sha_check(self, ranges_str, expected_sha, partition):
|
|
"""
|
|
Get the sha_check command.
|
|
:param ranges_str: ranges string
|
|
:param expected_sha: hash value
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = ('sha_check("/{partition}", "{ranges_str}", '
|
|
'"{expected_sha}")').format(
|
|
ranges_str=ranges_str,
|
|
expected_sha=expected_sha, partition=partition)
|
|
return cmd
|
|
|
|
def image_sha_check(self, partition, src_size, src_hash,
|
|
dst_size, dst_hash):
|
|
"""
|
|
Get the image_sha_check command.
|
|
:param ranges_str: ranges string
|
|
:param expected_sha: hash value
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = ('image_sha_check("/{partition}", '
|
|
'"{src_size}", "{src_hash}", '
|
|
'"{dst_size}", "{dst_hash}")').format(
|
|
partition=partition, src_size=src_size, src_hash=src_hash,
|
|
dst_size=dst_size, dst_hash=dst_hash)
|
|
return cmd
|
|
|
|
def first_block_check(self, partition):
|
|
"""
|
|
Get the first_block_check command.
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = 'first_block_check("/{partition}")'.format(
|
|
partition=partition)
|
|
return cmd
|
|
|
|
def abort(self, partition):
|
|
"""
|
|
Get the abort command.
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = 'abort("ERROR: {partition} partition ' \
|
|
'fails to incremental check!");\n'.format(
|
|
partition=partition)
|
|
return cmd
|
|
|
|
def show_progress(self, start_progress, dur):
|
|
"""
|
|
Get the show_progress command.
|
|
'dur' may be zero to advance the progress via SetProgress
|
|
:param start_progress: start progress
|
|
:param dur: seconds
|
|
:return:
|
|
"""
|
|
cmd = 'show_progress({start_progress}, {dur});\n'.format(
|
|
start_progress=float(start_progress), dur=float(dur))
|
|
return cmd
|
|
|
|
def image_patch(self, partition, src_size, src_hash, target_size, target_hash):
|
|
"""
|
|
Get the image_patch command.
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = 'image_patch("/{partition}", "{src_size}", ' \
|
|
'"{src_hash}", "{target_size}", "{target_hash}", ' \
|
|
'"{partition}.patch.dat");\n'.format(partition=partition, src_size=src_size,
|
|
src_hash=src_hash, target_size=target_size, target_hash=target_hash)
|
|
return cmd
|
|
|
|
def block_update(self, partition):
|
|
"""
|
|
Get the block_update command.
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = 'block_update("/{partition}", ' \
|
|
'"{partition}.transfer.list", "{partition}.new.dat", ' \
|
|
'"{partition}.patch.dat");\n'.format(partition=partition)
|
|
return cmd
|
|
|
|
def image_write(self, partition, image_name, image_path):
|
|
return self.raw_image_write(partition, image_name)
|
|
|
|
def raw_image_write(self, partition, image_name):
|
|
"""
|
|
Get the raw_image_write command.
|
|
:param partition: image name
|
|
:return:
|
|
"""
|
|
cmd = 'raw_image_write("/%s", "/%s");\n' % (partition, image_name)
|
|
return cmd
|
|
|
|
def get_status(self):
|
|
"""
|
|
Get the get_status command.
|
|
:return:
|
|
"""
|
|
cmd = 'get_status("/misc")'
|
|
return cmd
|
|
|
|
def set_status(self, status_value):
|
|
"""
|
|
Get the set_status command.
|
|
:param status_value: status value to be set
|
|
:return:
|
|
"""
|
|
cmd = 'set_status("/misc", %s);\n' % status_value
|
|
return cmd
|
|
|
|
def reboot_now(self):
|
|
"""
|
|
Get the reboot_now command.
|
|
:return:
|
|
"""
|
|
cmd = 'reboot_now();\n'
|
|
return cmd
|
|
|
|
def updater_partitions(self):
|
|
"""
|
|
Get the updater_partitions command.
|
|
:return:
|
|
"""
|
|
cmd = 'update_partitions("/%s");\n' % PARTITION_FILE
|
|
return cmd
|
|
|
|
def full_image_update(self, update_file_name):
|
|
cmd = 'update_from_bin("%s");\n' % update_file_name
|
|
return cmd
|
|
|
|
def pkg_extract(self, pkg_file_name, dest_path):
|
|
cmd = 'pkg_extract("%s", "%s");\n' % (pkg_file_name, dest_path)
|
|
return cmd
|
|
|
|
|
|
class RefrainScript(Script):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
|
|
class EndingScript(Script):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
|
|
def write_script(script_content, opera_name):
|
|
"""
|
|
Generate the {opera}script.
|
|
:param script_content: script content
|
|
:param opera_name: Opera phase names corresponding to the script content
|
|
'prelude', 'verse', 'refrain', and 'ending'.
|
|
:return:
|
|
"""
|
|
script_file = tempfile.NamedTemporaryFile(mode='w+')
|
|
script_file.write(script_content)
|
|
script_file.seek(0)
|
|
script_file_name = ''.join([opera_name.title(), SCRIPT_FILE_NAME])
|
|
OPTIONS_MANAGER.opera_script_file_name_dict[opera_name].\
|
|
append((script_file_name, script_file))
|
|
UPDATE_LOGGER.print_log("%s generation complete!" % script_file_name)
|
|
|
|
|
|
def generate_total_script():
|
|
"""
|
|
Generate the overall script.
|
|
"""
|
|
content_list = []
|
|
for each_key, each_value in \
|
|
OPTIONS_MANAGER.opera_script_file_name_dict.items():
|
|
for each in each_value:
|
|
each_content = "LoadScript(\"%s\", %s);" % \
|
|
(each[0], SCRIPT_KEY_LIST.index(each_key))
|
|
content_list.append(each_content)
|
|
script_total = tempfile.NamedTemporaryFile(mode='w+')
|
|
script_total.write('\n'.join(content_list))
|
|
script_total.seek(0)
|
|
OPTIONS_MANAGER.total_script_file_obj = script_total
|
|
UPDATE_LOGGER.print_log("%s generation complete!" % TOTAL_SCRIPT_FILE_NAME)
|
|
|
|
|
|
def get_progress_value(distributable_value=100):
|
|
"""
|
|
Allocate a progress value to each image update.
|
|
:param distributable_value: distributable value
|
|
:return:
|
|
"""
|
|
progress_value_dict = {}
|
|
full_img_list = OPTIONS_MANAGER.full_img_list
|
|
incremental_img_list = OPTIONS_MANAGER.incremental_img_list
|
|
file_size_list = []
|
|
each_img_size = 0
|
|
if len(full_img_list) == 0 and len(incremental_img_list) == 0:
|
|
UPDATE_LOGGER.print_log(
|
|
"get progress value failed! > getting progress value failed!",
|
|
UPDATE_LOGGER.ERROR_LOG)
|
|
return False
|
|
for partition in incremental_img_list:
|
|
# Obtain the size of the incremental image file.
|
|
if partition in OPTIONS_MANAGER.incremental_image_file_obj_dict:
|
|
file_obj = OPTIONS_MANAGER.incremental_image_file_obj_dict[partition]
|
|
each_img_size = os.path.getsize(file_obj.name)
|
|
elif partition in OPTIONS_MANAGER.incremental_block_file_obj_dict:
|
|
new_dat_file_obj, patch_dat_file_obj, transfer_list_file_obj =\
|
|
OPTIONS_MANAGER.incremental_block_file_obj_dict[partition].get_file_obj()
|
|
each_img_size = os.path.getsize(new_dat_file_obj.name) + os.path.getsize(patch_dat_file_obj.name)
|
|
file_size_list.append(each_img_size)
|
|
|
|
total_full_img_size = 0
|
|
for idx, _ in enumerate(full_img_list):
|
|
# Obtain the size of the full image file.
|
|
file_obj = OPTIONS_MANAGER.full_image_file_obj_list[idx]
|
|
total_full_img_size += os.path.getsize(file_obj.name)
|
|
file_size_list.append(total_full_img_size)
|
|
|
|
proportion_value_list = get_proportion_value_list(
|
|
file_size_list, distributable_value=distributable_value)
|
|
|
|
adjusted_proportion_value_list = adjust_proportion_value_list(
|
|
proportion_value_list, distributable_value)
|
|
|
|
all_img_list = incremental_img_list + [UPDATE_BIN_FILE_NAME]
|
|
current_progress = 40
|
|
for idx, each_img in enumerate(all_img_list):
|
|
temp_progress = current_progress + adjusted_proportion_value_list[idx]
|
|
progress_value_dict[each_img] = (current_progress, temp_progress)
|
|
current_progress = temp_progress
|
|
return progress_value_dict
|
|
|
|
|
|
def get_proportion_value_list(file_size_list, distributable_value=100):
|
|
"""
|
|
Obtain the calculated progress proportion value list
|
|
(proportion_value_list).
|
|
:param file_size_list: file size list
|
|
:param distributable_value: distributable value
|
|
:return proportion_value_list: progress proportion value list
|
|
"""
|
|
sum_size = sum(file_size_list)
|
|
getcontext().prec = 2
|
|
proportion_value_list = []
|
|
for each_size_value in file_size_list:
|
|
proportion = Decimal(str(float(each_size_value))) / Decimal(
|
|
str(float(sum_size)))
|
|
proportion_value = int(
|
|
Decimal(str(proportion)) *
|
|
Decimal(str(float(distributable_value))))
|
|
if proportion_value == 0:
|
|
proportion_value = 1
|
|
proportion_value_list.append(proportion_value)
|
|
return proportion_value_list
|
|
|
|
|
|
def adjust_proportion_value_list(proportion_value_list, distributable_value):
|
|
"""
|
|
Adjust the calculated progress proportion value list to ensure that
|
|
sum is equal to distributable_value.
|
|
:param proportion_value_list: calculated progress proportion value list
|
|
:param distributable_value: number of distributable progress values
|
|
:return proportion_value_list: new progress proportion value list
|
|
"""
|
|
if len(proportion_value_list) == 0:
|
|
return []
|
|
sum_proportion_value = sum(proportion_value_list)
|
|
if sum_proportion_value > distributable_value:
|
|
max_value = max(proportion_value_list)
|
|
max_idx = proportion_value_list.index(max_value)
|
|
proportion_value_list[max_idx] = \
|
|
max_value - (sum_proportion_value - distributable_value)
|
|
elif sum_proportion_value < distributable_value:
|
|
min_value = min(proportion_value_list)
|
|
min_idx = proportion_value_list.index(min_value)
|
|
proportion_value_list[min_idx] = \
|
|
min_value + (distributable_value - sum_proportion_value)
|
|
return proportion_value_list
|
|
|
|
|
|
def create_script(prelude_script, verse_script,
|
|
refrain_script, ending_script):
|
|
"""
|
|
Generate the script file.
|
|
:param prelude_script: prelude script
|
|
:param verse_script: verse script
|
|
:param refrain_script: refrain script
|
|
:param ending_script: ending script
|
|
:return:
|
|
"""
|
|
# Generate the prelude script.
|
|
prelude_script.add_command("\n# ---- prelude ----\n")
|
|
|
|
# Get the distribution progress.
|
|
progress_value_dict = get_progress_value()
|
|
if progress_value_dict is False:
|
|
return False
|
|
verse_script_content_list = verse_script.get_script()
|
|
updater_content = []
|
|
verse_script_content = '\n'.join(verse_script_content_list)
|
|
|
|
for key, value in progress_value_dict.items():
|
|
show_progress_content = \
|
|
verse_script.show_progress((value[1] - value[0]) / 100, 0)
|
|
verse_script_content = \
|
|
re.sub(r'%s_WRITE_FLAG' % key, '%s' % show_progress_content,
|
|
verse_script_content, count=1)
|
|
# Generate the verse script.
|
|
write_script(verse_script_content, 'verse')
|
|
# Generate the refrain script.
|
|
refrain_script.add_command("\n# ---- refrain ----\n")
|
|
# Generate the ending script.
|
|
ending_script.add_command("\n# ---- ending ----\n")
|
|
|
|
generate_total_script()
|