update OpenHarmony 2.0 Canary

This commit is contained in:
mamingshuai 2021-06-02 00:04:20 +08:00
parent 8f86bfb90b
commit 84fd413b20
20 changed files with 4369 additions and 61 deletions

15
.gitattributes vendored Normal file
View File

@ -0,0 +1,15 @@
*.tgz filter=lfs diff=lfs merge=lfs -text
*.trp filter=lfs diff=lfs merge=lfs -text
*.apk filter=lfs diff=lfs merge=lfs -text
*.jar filter=lfs diff=lfs merge=lfs -text
*.mp4 filter=lfs diff=lfs merge=lfs -text
*.zip filter=lfs diff=lfs merge=lfs -text
*.asm filter=lfs diff=lfs merge=lfs -text
*.8svn filter=lfs diff=lfs merge=lfs -text
*.9svn filter=lfs diff=lfs merge=lfs -text
*.dylib filter=lfs diff=lfs merge=lfs -text
*.exe filter=lfs diff=lfs merge=lfs -text
*.a filter=lfs diff=lfs merge=lfs -text
*.so filter=lfs diff=lfs merge=lfs -text
*.bin filter=lfs diff=lfs merge=lfs -text
*.dll filter=lfs diff=lfs merge=lfs -text

176
LICENSE Normal file
View File

@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

76
OAT.xml Executable file
View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2021 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Notes:
This is project config file for OpenHarmony OSS Audit Tool, if you have any questions or concerns, please email chenyaxun.
-->
<!-- OAT(OSS Audit Tool) configuration guide:
basedir: Root dir, the basedir + project path is the real source file location.
licensefile:
1.If the project don't have "LICENSE" in root dir, please define all the license files in this project in , OAT will check license files according to this rule.
tasklist(only for batch mode):
1. task: Define oat check thread, each task will start a new thread.
2. task name: Only an name, no practical effect.
3. task policy: Default policy for projects under this task, this field is required and the specified policy must defined in policylist.
4. task filter: Default filefilter for projects under this task, this field is required and the specified filefilter must defined in filefilterlist.
5. task project: Projects to be checked, the path field define the source root dir of the project.
policyList:
1. policy: All policyitems will be merged to default OAT.xml rules, the name of policy doesn't affect OAT check process.
2. policyitem: The fields type, name, path, desc is required, and the fields rule, group, filefilter is optional,the default value is:
<policyitem type="" name="" path="" desc="" rule="may" group="defaultGroup" filefilter="defaultPolicyFilter"/>
3. policyitem type:
"compatibility" is used to check license compatibility in the specified path;
"license" is used to check source license header in the specified path;
"copyright" is used to check source copyright header in the specified path;
"import" is used to check source dependency in the specified path, such as import ... ,include ...
"filetype" is used to check file type in the specified path, supported file types: archive, binary
"filename" is used to check whether the specified file exists in the specified path(support projectroot in default OAT.xml), supported file names: LICENSE, README, README.OpenSource
4. policyitem name: This field is used for define the license, copyright, "*" means match all, the "!" prefix means could not match this value. For example, "!GPL" means can not use GPL license.
5. policyitem path: This field is used for define the source file scope to apply this policyitem, the "!" prefix means exclude the files. For example, "!.*/lib/.*" means files in lib dir will be exclude while process this policyitem.
6. policyitem rule and group: These two fields are used together to merge policy results. "may" policyitems in the same group means any one in this group passed, the result will be passed.
7. policyitem filefilter: Used to bind filefilter which define filter rules.
8. filefilter: Filter rules, the type filename is used to filter file name, the type filepath is used to filter file path.
Note:If the text contains special characters, please escape them according to the following rules:
" == &gt;
& == &gt;
' == &gt;
< == &gt;
> == &gt;
-->
<configuration>
<oatconfig>
<licensefile></licensefile>
<policylist>
<policy name="projectPolicy" desc="">
<!--policyitem type="compatibility" name="curl" path=".*" rule="may" group="defaultGroup" filefilter="defaultPolicyFilter" desc=""/-->
<!--policyitem type="compatibility" name="GPL-2.0+" path=".*" desc="不使用或者使用但是是独立进程被X进程调用自研进程不受GPL影响"/>
<policyitem type="license" name="LGPL" path=".*" desc="未使用或者使用了被X进程以动态链接方式调用"/>
<policyitem type="copyright" name="xxx" path=".*" rule="may" group="defaultGroup" filefilter="copyrightPolicyFilter" desc="xxxx开发代码"/-->
</policy>
</policylist>
<filefilterlist>
<filefilter name="binaryFileTypePolicyFilter" desc="Filters for binary file policies" >
<filteritem type="filepath" name="lib/libpackage.so" desc="自研二进制源码在该updater仓中许可证就是该仓许可证二进制文件不包含三方文件"/>
</filefilter>
</filefilterlist>
</oatconfig>
</configuration>

View File

@ -1,36 +0,0 @@
# update_packaging_tools
#### Description
Packaging tools | 升级包制作工具
#### Software Architecture
Software architecture description
#### Installation
1. xxxx
2. xxxx
3. xxxx
#### Instructions
1. xxxx
2. xxxx
3. xxxx
#### Contribution
1. Fork the repository
2. Create Feat_xxx branch
3. Commit your code
4. Create Pull Request
#### Gitee Feature
1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md
2. Gitee blog [blog.gitee.com](https://blog.gitee.com)
3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore)
4. The most valuable open source project [GVP](https://gitee.com/gvp)
5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help)
6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

98
README.md Normal file → Executable file
View File

@ -1,37 +1,85 @@
# update_packaging_tools
# Packaging Tool<a name="EN-US_TOPIC_0000001101934690"></a>
#### 介绍
Packaging tools | 升级包制作工具
- [Introduction](#section184mcpsimp)
- [Directory Structure](#section191mcpsimp)
- [Description](#section211mcpsimp)
- [Repositories Involved](#section247mcpsimp)
#### 软件架构
软件架构说明
## Introduction<a name="section184mcpsimp"></a>
The packaging tool is used to prepare an update package. It provides the following functions:
#### 安装教程
- Creating a full update package: The update package contains only the data necessary for full image update.
1. xxxx
2. xxxx
3. xxxx
- Creating a differential update package: The update package contains only the data necessary for differential image update.
#### 使用说明
- Creating an update package with changeable partitions: The update package contains the partition table and full image data, which are used for partition change processing and image restoration after partition change.
1. xxxx
2. xxxx
3. xxxx
## Directory Structure<a name="section191mcpsimp"></a>
#### 参与贡献
```
/base/update/packaging_tools
├── lib # Dependency libraries of the packaging tool.
├── blocks_manager.py # BlocksManager class for block management
├── build_update.py # Access to the packaging tool for differential update packages
├── gigraph_process.py # Stash for re-sorting the ActionList
├── image_class.py # Full image and sparse image parsing
├── log_exception.py # Global log system with custom exceptions
├── patch_package_process.py # Differential image processing for obtaining patch difference through differential calculation on blocks
├── script_generator.py # Update script generator
├── transfers_manager.py # ActionInfo object creation
├── update_package.py # Update package format management and update package writing
├── utils.py # Options management and related functions
└── vendor_script.py # Extended update scripts
```
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
## Description<a name="section211mcpsimp"></a>
Running environment:
#### 特技
- Ubuntu 18.04 or later
- Python 3.5 or later
- Python library xmltodict, which is used to parse XML files and needs to be installed independently.
- bsdiff executable program, which performs differential calculation to generate the patch package
- imgdiff executable program, which performs differential calculation on the zip, gz, and lz4 files to generate the patch package
- e2fsdroid executable program, which performs differential calculation to generate the map files of an image
Parameter configuration:
```
Positional arguments:
target_package Target package file path.
update_package Update package file path.
Optional arguments:
-h, --help Show this help message and exit.
-s SOURCE_PACKAGE, --source_package SOURCE_PACKAGE Source package file path.
-nz, --no_zip No zip mode, which means to output update package without zip.
-pf PARTITION_FILE, --partition_file PARTITION_FILE Variable partition mode, which means to partition list file path.
-sa {ECC,RSA}, --signing_algorithm {ECC,RSA} Signing algorithms supported by the tool, including ECC and RSA.
-ha {sha256,sha384}, --hash_algorithm {sha256,sha384} Hash algorithms supported by the tool, including sha256 and sha384.
-pk PRIVATE_KEY, --private_key PRIVATE_KEY Private key file path.
```
Example code for creating a full update package:
```
python build_update.py ./target/ ./target/package -pk ./target/updater_config/rsa_private_key2048.pem
```
Example code for creating a differential update package:
```
python build_update.py -s source.zip ./target/ ./target/package -pk./target/updater_config/rsa_private_key2048.pem
```
## Repositories Involved<a name="section247mcpsimp"></a>
Update subsystem
**update\_packaging\_tools**
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

85
README_zh.md Executable file
View File

@ -0,0 +1,85 @@
# 升级包制作工具<a name="ZH-CN_TOPIC_0000001101934690"></a>
- [简介](#section184mcpsimp)
- [目录](#section191mcpsimp)
- [说明](#section211mcpsimp)
- [相关仓](#section247mcpsimp)
## 简介<a name="section184mcpsimp"></a>
升级包制作工具是中用于制作升级包的工具,功能主要包括:全量升级包制作、差分升级包制作以及变分区升级包制作。
- 全量升级包制作:升级包中只包括镜像全量升级相关数据,用于镜像全量升级;
- 差分升级包制作:升级包中只包括镜像差分升级相关数据,用于镜像差分升级;
- 变分区升级包:升级包中包括分区表、镜像全量数据,用于变分区处理和变分区后的镜像恢复。
## 目录<a name="section191mcpsimp"></a>
```
/base/update/packaging_tools
├── lib # 制作升级包工具依赖库目录
├── blocks_manager.py # BlocksManager类定义用于block块管理
├── build_update.py # 差分包制作工具入口代码,入口参数定义
├── gigraph_process.py # 生成Stash重置ActionList的顺序
├── image_class.py # 全量镜像、稀疏镜像解析处理
├── log_exception.py # 全局log系统定义自定义exception
├── patch_package_process.py # 差分镜像处理Block差分获取patch差异
├── script_generator.py # 升级脚本生成器
├── transfers_manager.py # 创建ActionInfo对象
├── update_package.py # 升级包格式管理、升级包写入
├── utils.py # Options管理,其他相关功能函数定义
└── vendor_script.py # 厂商升级流程脚本扩展
```
## 说明<a name="section211mcpsimp"></a>
工具运行环境配置:
- Ubuntu18.04或更高版本系统;
- python3.5及以上版本;
- python库xmltodict 解析xml文件需要单独安装
- bsdiff可执行程序差分计算比较生成patch
- imgdiff可执行程序差分计算针对zip、gz、lz4类型的文件对比生成patch
- e2fsdroid可执行程序差分计算用于生成镜像的map文件。
工具参数配置说明:
```
positional arguments:
target_package Target package file path.
update_package Update package file path.
optional arguments:
-h, --help show this help message and exit
-s SOURCE_PACKAGE, --source_package SOURCE_PACKAGE Source package file path.
-nz, --no_zip No zip mode, Output update package without zip.
-pf PARTITION_FILE, --partition_file PARTITION_FILE Variable partition mode, Partition list file path.
-sa {ECC,RSA}, --signing_algorithm {ECC,RSA} The signing algorithm supported by the tool include['ECC', 'RSA'].
-ha {sha256,sha384}, --hash_algorithm {sha256,sha384} The hash algorithm supported by the tool include ['sha256', 'sha384'].
-pk PRIVATE_KEY, --private_key PRIVATE_KEY Private key file path.
```
全量升级包制作命令示例:
```
python build_update.py ./target/ ./target/package -pk ./target/updater_config/rsa_private_key2048.pem
```
差分升级包制作命令示例:
```
python build_update.py -s source.zip ./target/ ./target/package -pk./target/updater_config/rsa_private_key2048.pem
```
## 相关仓<a name="section247mcpsimp"></a>
升级子系统
**update\_packaging\_tools**

229
blocks_manager.py Normal file
View File

@ -0,0 +1,229 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
import itertools
import operator
class BlocksManager(object):
"""
blocks manager
"""
def __init__(self, range_data=None):
self.monotonic = False
if isinstance(range_data, str):
self.__parse_data_text(range_data)
elif range_data:
if len(range_data) % 2 != 0:
raise RuntimeError
self.range_data = tuple(self.__remove_repeated_pairs(range_data))
self.monotonic = all(
x < y for x, y in zip(self.range_data, self.range_data[1:]))
else:
self.range_data = ()
def __iter__(self):
for i in range(0, len(self.range_data), 2):
yield self.range_data[i:i + 2]
def __eq__(self, other):
return self.range_data == other.range_data
def __ne__(self, other):
return self.range_data != other.range_data
def __parse_data_text(self, text):
"""
Parse data from text content.
"""
data = []
monotonic = True
last = -1
for split_content in text.split():
if "-" in split_content:
start_value, end_value = \
(int(n) for n in split_content.split("-"))
data.append(start_value)
data.append(end_value + 1)
if last <= start_value <= end_value:
last = end_value
else:
monotonic = False
else:
int_content = int(split_content)
data.append(int_content)
data.append(int_content + 1)
if last <= int_content:
last = int_content + 1
else:
monotonic = False
data.sort()
self.range_data = tuple(self.__remove_repeated_pairs(data))
self.monotonic = monotonic
@staticmethod
def __remove_repeated_pairs(source):
"""
Remove repeated blocks.
"""
new = None
for num in source:
if num == new:
new = None
else:
if new is not None:
yield new
new = num
if new is not None:
yield new
def to_string_raw(self):
if len(self.range_data) == 0:
raise RuntimeError
return "".join([str(len(self.range_data)), ",", ",".join(
str(i) for i in self.range_data)])
def get_union_with_other(self, other):
"""
Obtain the intersection.
"""
range_a = self.get_subtract_with_other(other)
range_b = other.get_subtract_with_other(self)
range_c = self.get_intersect_with_other(other)
range_e, range_f, range_g = \
list(range_a.range_data), list(range_b.range_data), list(
range_c.range_data)
range_d = []
range_d.extend(range_e)
range_d.extend(range_f)
range_d.extend(range_g)
range_d.sort()
return BlocksManager(range_data=range_d)
def get_intersect_with_other(self, other):
"""
Obtain the intersection.
"""
other_data, data, new_data = list(self.range_data), list(
other.range_data), []
for i in range(len(data) // 2):
for j in range(len(other_data) // 2):
data_list1 = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
other_data[j * 2 + 1]]
data_list2 = [other_data[j * 2], other_data[j * 2 + 1],
data[i * 2], data[i * 2 + 1]]
sort_list = [data[i * 2], data[i * 2 + 1], other_data[j * 2],
other_data[j * 2 + 1]]
sort_list.sort()
if operator.ne(sort_list, data_list1) and \
operator.ne(sort_list, data_list2):
new_data.append(sort_list[1])
new_data.append(sort_list[2])
return BlocksManager(range_data=new_data)
def get_subtract_with_other(self, other):
"""
Obtain the difference set.
"""
intersect_ran = self.get_intersect_with_other(other)
data, intersect_data = list(self.range_data), list(
intersect_ran.range_data)
new_data = data + intersect_data
new_data.sort()
return BlocksManager(range_data=new_data)
def is_overlaps(self, other):
"""
Determine whether there is non-empty overlap.
"""
intersect_range = self.get_intersect_with_other(other)
if intersect_range.size():
return True
return False
def size(self):
"""
Obtain the self size.
"""
total = 0
data = list(self.range_data)
for i in range(len(data) // 2):
total += data[i * 2 + 1] - data[i * 2]
return total
def get_map_within(self, other):
"""
When other is a subset of self,
obtain the continuous range starting from 0.
:param other:
:return:
"""
out = []
offset = 0
start = None
for be_num, af_num in \
heapq.merge(zip(self.range_data, itertools.cycle((-5, +5))),
zip(other.range_data, itertools.cycle((-1, +1)))):
if af_num == -5:
start = be_num
elif af_num == +5:
offset += be_num - start
start = None
else:
out.append(offset + be_num - start)
return BlocksManager(range_data=out)
def extend_value_to_blocks(self, value):
"""
Extend self
:param value:
:return:
"""
data = list(self.range_data)
remove_data = []
for i in range(len(data) // 2):
data[i * 2 + 1] = data[i * 2 + 1] + value
data[i * 2] = max(0, data[i * 2] - value)
for i in range(len(data) // 2 - 1):
sign_1 = data[i * 2 + 1]
sign_2 = data[(i + 1) * 2]
if sign_1 >= sign_2:
remove_data.append(sign_2)
remove_data.append(sign_1)
for j in remove_data:
data.remove(j)
return BlocksManager(data)
def get_first_block_obj(self, value):
"""
Return the first range pair containing the value.
:param value:
:return:
"""
if self.size() <= value:
return self
data = list(self.range_data)
be_value, af_value = 0, 1
for i in range(len(data) // 2):
be_value += data[i * 2 + 1] - data[i * 2]
if be_value > value:
data[i * 2 + 1] = data[i * 2 + 1] - be_value + value
break
else:
af_value += 1
return BlocksManager(range_data=data[:af_value * 2])

647
build_update.py Normal file
View File

@ -0,0 +1,647 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The tool for making updater package.
positional arguments:
target_package Target package file path.
update_package Update package file path.
optional arguments:
-h, --help show this help message and exit
-s SOURCE_PACKAGE, --source_package SOURCE_PACKAGE
Source package file path.
-nz, --no_zip No zip mode,
which means to output the update package without zip.
-pf PARTITION_FILE, --partition_file PARTITION_FILE
Variable partition mode, Partition list file path.
-sa {ECC,RSA}, --signing_algorithm {ECC,RSA}
The signing algorithms
supported by the tool include ['ECC', 'RSA'].
-ha {sha256,sha384}, --hash_algorithm {sha256,sha384}
The hash algorithms
supported by the tool include ['sha256', 'sha384'].
-pk PRIVATE_KEY, --private_key PRIVATE_KEY
Private key file path.
"""
import copy
import filecmp
import os
import argparse
import re
import subprocess
import xmltodict
from gigraph_process import GigraphProcess
from image_class import FullUpdateImage
from image_class import SparseImage
from patch_package_process import PatchProcess
from transfers_manager import TransfersManager
from log_exception import UPDATE_LOGGER
from script_generator import PreludeScript
from script_generator import VerseScript
from script_generator import RefrainScript
from script_generator import EndingScript
from update_package import build_update_package
from utils import OPTIONS_MANAGER
from utils import UPDATER_CONFIG
from utils import parse_partition_file_xml
from utils import unzip_package
from utils import clear_resource
from utils import PRODUCT
from utils import XML_FILE_PATH
from utils import get_update_info
from utils import SCRIPT_KEY_LIST
from utils import PER_BLOCK_SIZE
from vendor_script import create_vendor_script_class
def type_check(arg):
"""
Argument check, which is used to check whether the specified arg is a file.
:param arg: the arg to check
:return: Check result, which is False if the arg is invalid.
"""
if arg is not None and not os.path.exists(arg):
UPDATE_LOGGER.print_log(
"FileNotFoundError, path: %s" % arg, UPDATE_LOGGER.ERROR_LOG)
return False
return arg
def private_key_check(arg):
"""
Argument check, which is used to check whether
the specified arg is a private_key.
:param arg: The arg to check.
:return: Check result, which is False if the arg is invalid.
"""
if arg != "ON_SERVER" and not os.path.isfile(arg):
UPDATE_LOGGER.print_log(
"FileNotFoundError, path: %s" % arg, UPDATE_LOGGER.ERROR_LOG)
return False
return arg
def check_update_package(arg):
"""
Argument check, which is used to check whether
the update package path exists.
:param arg: The arg to check.
:return: Check result
"""
make_dir_path = None
if os.path.exists(arg):
if os.path.isfile(arg):
UPDATE_LOGGER.print_log(
"Update package must be a dir path, not a file path. "
"path: %s" % arg, UPDATE_LOGGER.ERROR_LOG)
return False
else:
try:
UPDATE_LOGGER.print_log(
"Update package path does not exist. The dir will be created!"
"path: %s" % arg, UPDATE_LOGGER.WARNING_LOG)
os.makedirs(arg)
make_dir_path = arg
except OSError:
UPDATE_LOGGER.print_log(
"Make update package path dir failed! "
"path: %s" % arg, UPDATE_LOGGER.ERROR_LOG)
return False
if make_dir_path is not None:
OPTIONS_MANAGER.make_dir_path = make_dir_path
return arg
def create_entrance_args():
"""
Arguments for the tool to create an update package
:return source_package : source version package
target_package : target version package
update_package : update package output path
no_zip : whether to enable the update package zip function.
partition_file : partition table XML file
signing_algorithm : signature algorithm (ECC and RSA (default))
private_key : path of the private key file
"""
description = "Tool for creating update package."
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-s", "--source_package", type=type_check,
default=None, help="Source package file path.")
parser.add_argument("target_package", type=type_check,
help="Target package file path.")
parser.add_argument("update_package", type=check_update_package,
help="Update package file path.")
parser.add_argument("-nz", "--no_zip", action='store_true',
help="No zip mode, Output update package without zip.")
parser.add_argument("-pf", "--partition_file", default=None,
help="Variable partition mode, "
"Partition list file path.")
parser.add_argument("-sa", "--signing_algorithm", default='RSA',
choices=['ECC', 'RSA'],
help="The signing algorithm "
"supported by the tool include ['ECC', 'RSA'].")
parser.add_argument("-ha", "--hash_algorithm", default='sha256',
choices=['sha256', 'sha384'],
help="The hash algorithm "
"supported by the tool include "
"['sha256', 'sha384'].")
parser.add_argument("-pk", "--private_key", type=private_key_check,
default=None, help="Private key file path.")
args = parser.parse_args()
source_package = args.source_package
OPTIONS_MANAGER.source_package = source_package
target_package = args.target_package
OPTIONS_MANAGER.target_package = target_package
update_package = args.update_package
OPTIONS_MANAGER.update_package = update_package
no_zip = args.no_zip
OPTIONS_MANAGER.no_zip = no_zip
partition_file = args.partition_file
OPTIONS_MANAGER.partition_file = partition_file
signing_algorithm = args.signing_algorithm
OPTIONS_MANAGER.signing_algorithm = signing_algorithm
hash_algorithm = args.hash_algorithm
OPTIONS_MANAGER.hash_algorithm = hash_algorithm
private_key = args.private_key
OPTIONS_MANAGER.private_key = private_key
return source_package, target_package, update_package, no_zip, \
partition_file, signing_algorithm, hash_algorithm, private_key
def get_script_obj(script_obj=None):
"""
获取Opera script对象
:return:
"""
script_obj_list = create_vendor_script_class()
if script_obj_list == [None] * len(SCRIPT_KEY_LIST) and script_obj is None:
prelude_script = PreludeScript()
verse_script = VerseScript()
refrain_script = RefrainScript()
ending_script = EndingScript()
else:
if script_obj_list == [None] * len(SCRIPT_KEY_LIST):
script_obj_list = script_obj
UPDATE_LOGGER.print_log(
"Get vendor extension object completed!"
"The vendor extension script will be generated.")
prelude_script = script_obj_list[0]
verse_script = script_obj_list[1]
refrain_script = script_obj_list[2]
ending_script = script_obj_list[3]
return prelude_script, verse_script, refrain_script, ending_script
def check_incremental_args(no_zip, partition_file, source_package):
"""
When the incremental list is not empty, incremental processing is required.
In this case, check related arguments.
:param no_zip:
:param partition_file:
:param source_package:
:return:
"""
if source_package is None:
UPDATE_LOGGER.print_log(
"The source package is missing, "
"cannot be incrementally processed!",
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
if no_zip:
UPDATE_LOGGER.print_log(
"No ZIP mode, cannot be incrementally processed!",
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
if partition_file is not None:
UPDATE_LOGGER.print_log(
"Partition file is not None, "
"cannot be incrementally processed!",
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
OPTIONS_MANAGER.source_package_temp_obj, \
OPTIONS_MANAGER.source_package_dir = \
unzip_package(source_package, origin='source')
xml_path = ''
if OPTIONS_MANAGER.source_package_dir is not False:
xml_path = os.path.join(OPTIONS_MANAGER.source_package_dir,
UPDATER_CONFIG, XML_FILE_PATH)
if OPTIONS_MANAGER.source_package_dir is False:
OPTIONS_MANAGER.source_package_temp_obj = None
OPTIONS_MANAGER.source_package_dir = None
if os.path.exists(xml_path):
with open(xml_path, 'r') as xml_file:
xml_str = xml_file.read()
else:
UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" %
xml_path, UPDATE_LOGGER.ERROR_LOG)
return False
xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
package_dict = xml_content_dict.get('package', {})
head_dict = package_dict.get('head', {}).get('info')
OPTIONS_MANAGER.source_package_version = head_dict.get("@softVersion")
if check_package_version(OPTIONS_MANAGER.target_package_version,
OPTIONS_MANAGER.source_package_version) is False:
clear_resource(err_clear=True)
return False
return True
def check_userdata_image():
"""
Check the userdata image. Updating this image is prohibited.
:return:
"""
if 'userdata' in OPTIONS_MANAGER.full_img_list or \
'userdata' in OPTIONS_MANAGER.incremental_img_list:
UPDATE_LOGGER.print_log(
"userdata image does not participate in update!"
"Please check xml config, path: %s!" %
os.path.join(OPTIONS_MANAGER.target_package_config_dir,
XML_FILE_PATH),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
return True
def check_images_list():
"""
Check full_img_list and incremental_img_list.
If their lengths are 0, an error will be logged.
:return:
"""
if len(OPTIONS_MANAGER.full_img_list) == 0 and \
len(OPTIONS_MANAGER.incremental_img_list) == 0:
UPDATE_LOGGER.print_log(
"The image list is empty!"
"Please check xml config, path: %s!" %
os.path.join(OPTIONS_MANAGER.target_package_config_dir,
XML_FILE_PATH),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
return True
def check_target_package_path(target_package):
"""
Check the target_package path.
:param target_package: target package path
:return:
"""
if os.path.isdir(target_package):
OPTIONS_MANAGER.target_package_dir = target_package
temp_dir_list = os.listdir(target_package)
if UPDATER_CONFIG in temp_dir_list:
OPTIONS_MANAGER.target_package_config_dir = \
os.path.join(target_package, UPDATER_CONFIG)
else:
UPDATE_LOGGER.print_log(
"Exception's target package path! path: %s" %
target_package, UPDATE_LOGGER.ERROR_LOG)
return False
elif target_package.endswith('.zip'):
# Decompress the target package.
tmp_dir_obj, unzip_dir = unzip_package(target_package)
if tmp_dir_obj is False or unzip_dir is False:
clear_resource(err_clear=True)
return False
OPTIONS_MANAGER.target_package_dir = unzip_dir
OPTIONS_MANAGER.target_package_temp_obj = tmp_dir_obj
OPTIONS_MANAGER.target_package_config_dir = \
os.path.join(unzip_dir, UPDATER_CONFIG)
else:
UPDATE_LOGGER.print_log(
"Input Update Package type exception! path: %s" %
target_package, UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
return True
def check_miss_private_key(private_key):
"""
Check private key.
:param private_key:
:return:
"""
if private_key is None:
UPDATE_LOGGER.print_log(
"Private key is None, update package cannot be signed! "
"Please specify the signature private key by -pk.",
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
return True
def check_package_version(target_ver, source_ver):
"""
target_ver: target version
source_ver: source version
return:
"""
try:
target_num = ''.join(target_ver.split(' ')[-1].split('.')[1:3])
source_num = ''.join(source_ver.split(' ')[-1].split('.')[1:3])
if int(target_num) <= int(source_num):
UPDATE_LOGGER.print_log(
'Target package version %s <= Source package version!'
'Unable to make updater package!',
UPDATE_LOGGER.ERROR_LOG)
return False
except ValueError:
UPDATE_LOGGER.print_log('your package version number is not compliant.'
'Please check your package version number!',
UPDATE_LOGGER.ERROR_LOG)
return False
return True
def increment_image_processing(
verse_script, incremental_img_list, source_package_dir,
target_package_dir):
"""
Incremental image processing
:param verse_script: verse script
:param incremental_img_list: incremental image list
:param source_package_dir: source package path
:param target_package_dir: target package path
:return:
"""
script_check_cmd_list = []
script_write_cmd_list = []
patch_process = None
for each_img in incremental_img_list:
each_src_image_path = \
os.path.join(source_package_dir,
'%s.img' % each_img)
each_src_map_path = \
os.path.join(source_package_dir,
'%s.map' % each_img)
each_tgt_image_path = \
os.path.join(target_package_dir,
'%s.img' % each_img)
each_tgt_map_path = \
os.path.join(target_package_dir,
'%s.map' % each_img)
if not os.path.exists(each_src_image_path):
UPDATE_LOGGER.print_log(
"The source %s.img file is missing from the source package, "
"the component: %s cannot be incrementally processed. "
"path: %s!" %
(each_img, each_img,
os.path.join(source_package_dir, UPDATER_CONFIG,
XML_FILE_PATH)),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
check_make_map_path(each_img)
cmd = ["e2fsdroid", "-B", each_src_map_path,
"-a", "/%s" % each_img, each_src_image_path]
sub_p = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
sub_p.wait()
if not os.path.exists(each_tgt_image_path):
UPDATE_LOGGER.print_log(
"The target %s.img file is missing from the target package, "
"the component: %s cannot be incrementally processed. "
"Please check xml config, path: %s!" %
(each_img, each_img,
os.path.join(target_package_dir, UPDATER_CONFIG,
XML_FILE_PATH)),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
cmd = ["e2fsdroid", "-B", each_tgt_map_path,
"-a", "/%s" % each_img, each_tgt_image_path]
sub_p = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
sub_p.wait()
if filecmp.cmp(each_src_image_path, each_tgt_image_path):
UPDATE_LOGGER.print_log(
"Source Image is the same as Target Image!"
"src image path: %s, tgt image path: %s" %
(each_src_image_path, each_tgt_image_path),
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
return False
src_sparse_image = SparseImage(each_src_image_path, each_src_map_path)
tgt_sparse_image = SparseImage(each_tgt_image_path, each_tgt_map_path)
transfers_manager = TransfersManager(
each_img, tgt_sparse_image, src_sparse_image)
transfers_manager.find_process_needs()
actions_list = transfers_manager.get_action_list()
graph_process = GigraphProcess(actions_list, src_sparse_image,
tgt_sparse_image)
actions_list = copy.deepcopy(graph_process.actions_list)
patch_process = PatchProcess(each_img, tgt_sparse_image,
src_sparse_image,
actions_list)
patch_process.patch_process()
patch_process.package_patch_zip.package_patch_zip()
patch_process.write_script(each_img, script_check_cmd_list,
script_write_cmd_list, verse_script)
if not check_patch_file(patch_process):
UPDATE_LOGGER.print_log(
'Verify the incremental result failed!',
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
UPDATE_LOGGER.print_log(
'Verify the incremental result successfully!',
UPDATE_LOGGER.INFO_LOG)
verse_script.add_command(
"\n# ---- start incremental check here ----\n")
for each_check_cmd in script_check_cmd_list:
verse_script.add_command(each_check_cmd)
verse_script.add_command(
"\n# ---- start incremental write here ----\n")
for each_write_cmd in script_write_cmd_list:
verse_script.add_command(each_write_cmd)
return True
def check_patch_file(patch_process):
new_dat_file_obj, patch_dat_file_obj, transfer_list_file_obj = \
patch_process.package_patch_zip.get_file_obj()
with open(transfer_list_file_obj.name) as f_t:
num = 0
diff_str = None
diff_num = 0
for line in f_t:
if 'new' in line:
num_list = line.split('\n')[0].split(',')
child_num = (int(num_list[-1]) - int(num_list[-2]))
num += child_num
if 'diff' in line:
diff_str = line
if diff_str:
diff_list = diff_str.split('\n')[0].split(' ')
diff_num = int(diff_list[1]) + int(diff_list[2])
check_flag = \
(os.path.getsize(new_dat_file_obj.name) == num * PER_BLOCK_SIZE) and \
(os.path.getsize(patch_dat_file_obj.name) == diff_num)
return check_flag
def check_make_map_path(each_img):
"""
If env does not exist, the command for map generation does not exist
in the environment variable, and False will be returned.
"""
try:
cmd = ["e2fsdroid", " -h"]
subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except FileNotFoundError:
UPDATE_LOGGER.print_log(
"Command not found, need check the env! "
"Make %s.map failed!" % each_img,
UPDATE_LOGGER.ERROR_LOG)
clear_resource(err_clear=True)
raise RuntimeError
return True
def main():
"""
Entry function.
"""
OPTIONS_MANAGER.product = PRODUCT
source_package, target_package, update_package, no_zip, \
partition_file, signing_algorithm, hash_algorithm, private_key = \
create_entrance_args()
if source_package is False or private_key is False or \
target_package is False or update_package is False:
return
if check_miss_private_key(private_key) is False:
clear_resource(err_clear=True)
return
if check_target_package_path(target_package) is False:
clear_resource(err_clear=True)
return
if get_update_info() is False:
clear_resource(err_clear=True)
return
if check_images_list() is False:
clear_resource(err_clear=True)
return
if check_userdata_image() is False:
clear_resource(err_clear=True)
return
# Create a Script object.
prelude_script, verse_script, refrain_script, ending_script = \
get_script_obj()
# Create partition.
if partition_file is not None:
verse_script.add_command("\n# ---- do updater partitions ----\n")
updater_partitions_cmd = verse_script.updater_partitions()
verse_script.add_command(updater_partitions_cmd)
partition_file_obj, partitions_list = \
parse_partition_file_xml(partition_file)
if partition_file_obj is False:
clear_resource(err_clear=True)
return False
OPTIONS_MANAGER.partition_file_obj = partition_file_obj
OPTIONS_MANAGER.full_img_list = partitions_list
OPTIONS_MANAGER.two_step = False
# Upgrade the updater image.
if OPTIONS_MANAGER.two_step:
get_status_cmd = verse_script.get_status()
set_status_0_cmd = verse_script.set_status('0')
set_status_1_cmd = verse_script.set_status('1')
reboot_now_cmd = verse_script.reboot_now()
create_updater_script_command = \
'\n# ---- do updater partitions ----\n\n' \
'if ({get_status_cmd} == 0){{\nUPDATER_WRITE_FLAG\n' \
' {set_status_1_cmd} {reboot_now_cmd}}}\n' \
'else{{ \nALL_WRITE_FLAG\n {set_status_0_cmd}}}'.format(
get_status_cmd=get_status_cmd,
set_status_1_cmd=set_status_1_cmd,
set_status_0_cmd=set_status_0_cmd,
reboot_now_cmd=reboot_now_cmd)
verse_script.add_command(create_updater_script_command)
if len(OPTIONS_MANAGER.incremental_img_list) != 0:
if check_incremental_args(no_zip, partition_file, source_package)\
is False:
clear_resource(err_clear=True)
return
if increment_image_processing(
verse_script, OPTIONS_MANAGER.incremental_img_list,
OPTIONS_MANAGER.source_package_dir,
OPTIONS_MANAGER.target_package_dir) is False:
clear_resource(err_clear=True)
return
# Full processing
if len(OPTIONS_MANAGER.full_img_list) != 0:
verse_script.add_command("\n# ---- full image ----\n")
full_image_content_len_list, full_image_file_obj_list = \
FullUpdateImage(OPTIONS_MANAGER.target_package_dir,
OPTIONS_MANAGER.full_img_list, verse_script,
no_zip=OPTIONS_MANAGER.no_zip).\
update_full_image()
if full_image_content_len_list is False or \
full_image_file_obj_list is False:
clear_resource(err_clear=True)
return
OPTIONS_MANAGER.full_image_content_len_list, \
OPTIONS_MANAGER.full_image_file_obj_list = \
full_image_content_len_list, full_image_file_obj_list
# Generate the update package.
build_re = build_update_package(no_zip, update_package,
prelude_script, verse_script,
refrain_script, ending_script)
if build_re is False:
clear_resource(err_clear=True)
return
# Clear resources.
clear_resource()
if __name__ == '__main__':
main()

38
code_yacc.py Executable file
View File

@ -0,0 +1,38 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import optparse
import subprocess
if __name__ == '__main__':
print("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
parser_obj = optparse.OptionParser()
parser_obj.add_option("--scriptname",
help="generate yacc script name",
action="store_true", default=True)
parser_obj.add_option("--output",
help="yacc output path",
action="store_true", default="")
(option_list, parse_params) = parser_obj.parse_args()
if len(parse_params) < 1:
parser_obj.error("yacc param error.")
gen_script_name = parse_params[0]
output_path = parse_params[1]
parse_scripts = subprocess.check_call(
[gen_script_name], stdout=subprocess.PIPE, cwd=output_path)
print("result:", parse_scripts)

187
gigraph_process.py Normal file
View File

@ -0,0 +1,187 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from log_exception import UPDATE_LOGGER
# 50% of the data partition, in KB x 1024.
DATA_SIZE = 1374024 * 1024
class GigraphProcess(object):
def __init__(self, actions_list, src_sparse_image, tgt_sparse_image):
self.actions_list = actions_list
if len(self.actions_list) == 0:
raise RuntimeError
self.size_of_source_list = 0
self.src_sparse_img_obj = src_sparse_image
self.tgt_sparse_img_obj = tgt_sparse_image
self.vertices = len(self.actions_list)
self.data_size = DATA_SIZE
self.generate_digraph()
self.stash_process()
def generate_digraph(self):
"""
Start correlation lookup.
"""
source_ranges = []
source_ranges = \
self.get_source_ranges(self.actions_list, source_ranges)
self.get_intersections_dict(source_ranges)
# Start ordering.
topo_logical = TopoLogical(self)
action_stack = topo_logical.stack()
new_action_list = []
for action in action_stack:
action.order = len(new_action_list)
new_action_list.append(action)
self.actions_list = new_action_list
def get_intersections_dict(self, source_ranges):
"""
Get the intersections_dict.
:param source_ranges: source blocks
:return:
"""
for each_action in self.actions_list:
intersections = OrderedDict()
for start_value, end_value in each_action.tgt_block_set:
for i in range(start_value, end_value):
if i >= len(source_ranges):
break
if source_ranges[i] is not None:
for j in source_ranges[i]:
intersections[j] = None
self.update_goes_before_and_after(each_action, intersections)
@staticmethod
def update_goes_before_and_after(each_action, intersections):
"""
Update "goes before" and "goes after".
:param each_action: action to be processed
:param intersections: intersections dict
:return:
"""
for each_intersection in intersections:
if each_action is each_intersection:
continue
intersect_range = \
each_action.tgt_block_set.get_intersect_with_other(
each_intersection.src_block_set)
if intersect_range:
if each_intersection.src_name == "__ZERO":
size = 0
else:
size = intersect_range.size()
each_intersection.child[each_action] = size
each_action.parent[each_intersection] = size
@staticmethod
def get_source_ranges(transfers, source_ranges):
"""
Update "goes before" and "goes after".
:param transfers: actions list
:param source_ranges: source blocks
:return:
"""
for each_action in transfers:
for start_value, end_value in each_action.src_block_set:
if end_value > len(source_ranges):
source_ranges.extend(
[None] * (end_value - len(source_ranges)))
for i in range(start_value, end_value):
if source_ranges[i] is None:
source_ranges[i] = \
OrderedDict.fromkeys([each_action])
else:
source_ranges[i][each_action] = None
return source_ranges
def stash_process(self):
"""
Stash processing
"""
UPDATE_LOGGER.print_log("Reversing backward edges...")
stash_raw_id = 0
for each_action in self.actions_list:
each_child_dict = each_action.child.copy()
for each_before in each_child_dict:
if each_action.order >= each_before.order:
intersect_block_set = \
each_action.src_block_set.get_intersect_with_other(
each_before.tgt_block_set)
each_before.stash_before.append(
(stash_raw_id, intersect_block_set))
each_action.use_stash.append(
(stash_raw_id, intersect_block_set))
stash_raw_id += 1
each_action.child.pop(each_before)
each_before.parent.pop(each_action)
each_action.parent[each_before] = None
each_before.child[each_action] = None
UPDATE_LOGGER.print_log("Reversing backward edges completed!")
class DirectedCycle(object):
def __init__(self, graph):
self.graph = graph
self.marked = [False for _ in range(self.graph.vertices)]
self.has_cycle = False
self.ontrack = [False for _ in range(self.graph.vertices)]
class DepthFirstOrder:
def __init__(self, graph):
self.graph = graph
self.marked = {}
self.stack = []
for each_action in self.graph.actions_list:
self.marked[each_action] = False
def dfs(self):
def dfs(index):
self.marked[index] = True
for each_child in index.child:
if not self.marked[each_child]:
dfs(each_child)
self.stack.insert(0, index)
for each_action in self.graph.actions_list:
if not self.marked[each_action]:
dfs(each_action)
return self.stack
def sort_vertices(self):
return self.dfs()
class TopoLogical(object):
def __init__(self, graph):
self.order = None
self.cycle = DirectedCycle(graph)
if not self.cycle.has_cycle:
dfo = DepthFirstOrder(graph)
self.order = dfo.sort_vertices()
def stack(self):
return self.order

513
image_class.py Normal file
View File

@ -0,0 +1,513 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import bisect
import copy
import os
import struct
import tempfile
from hashlib import sha256
from log_exception import UPDATE_LOGGER
from blocks_manager import BlocksManager
from utils import SPARSE_IMAGE_MAGIC
from utils import HEADER_INFO_FORMAT
from utils import CHUNK_INFO_FORMAT
from utils import HEADER_INFO_LEN
from utils import CHUNK_INFO_LEN
from utils import EXTEND_VALUE
from utils import FILE_MAP_ZERO_KEY
from utils import FILE_MAP_NONZERO_KEY
from utils import FILE_MAP_COPY_KEY
from utils import MAX_BLOCKS_PER_GROUP
from utils import CHUNK_TYPE_RAW
from utils import CHUNK_TYPE_FILL
from utils import CHUNK_TYPE_DONT_CARE
from utils import CHUNK_TYPE_CRC32
class FullUpdateImage:
"""
Full image processing class
"""
def __init__(self, target_package_images_dir, full_img_list, verse_script,
no_zip=False):
self.__target_package_images_dir = target_package_images_dir
self.__full_img_list = full_img_list
self.__verse_script = verse_script
self.__no_zip = no_zip
def update_full_image(self):
"""
Processing of the full image
:return full_image_content_len_list: full image content length list
:return full_image_file_obj_list: full image temporary file list
"""
full_image_file_obj_list = []
full_image_content_len_list = []
for each_name in self.__full_img_list:
full_image_content = self.get_full_image_content(each_name)
if full_image_content is False:
UPDATE_LOGGER.print_log(
"Get full image content failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False, False
each_img = tempfile.NamedTemporaryFile(
prefix="full_image%s" % each_name, mode='wb')
each_img.write(full_image_content)
full_image_content_len_list.append(len(full_image_content))
full_image_file_obj_list.append(each_img)
UPDATE_LOGGER.print_log(
"Image %s full processing completed" % each_name)
if not self.__no_zip:
# No zip mode (no script command)
if is_sparse_image(each_img.name):
sparse_image_write_cmd = \
self.__verse_script.sparse_image_write(each_name)
cmd = '%s_WRITE_FLAG%s' % (
each_name, sparse_image_write_cmd)
else:
raw_image_write_cmd = \
self.__verse_script.raw_image_write(each_name)
cmd = '%s_WRITE_FLAG%s' % (
each_name, raw_image_write_cmd)
self.__verse_script.add_command(
cmd=cmd)
UPDATE_LOGGER.print_log(
"All full image processing completed! image count: %d" %
len(self.__full_img_list))
return full_image_content_len_list, full_image_file_obj_list
def get_full_image_content(self, each_name):
"""
Obtain the full image content.
:param each_name: image name
:return content: full image content if available; false otehrwise
"""
each_image_path = os.path.join(self.__target_package_images_dir,
'%s.img' % each_name)
if not os.path.exists(each_image_path):
UPDATE_LOGGER.print_log(
"The %s.img file is missing from the target package, "
"the component: %s cannot be full update processed. "
"path: %s" %
(each_name, each_name, each_image_path),
UPDATE_LOGGER.ERROR_LOG)
return False
with open(each_image_path, 'rb') as f_r:
content = f_r.read()
return content
def is_sparse_image(img_path):
"""
Check whether the image is a sparse image.
:param img_path: image path
:return:
"""
with open(img_path, 'rb') as f_r:
image_content = f_r.read(HEADER_INFO_LEN)
header_info = struct.unpack(HEADER_INFO_FORMAT, image_content)
*_, is_sparse = SparseImage.image_header_info_check(header_info)
return is_sparse
class SparseImage:
"""
Sparse image class
"""
def __init__(self, image_path, map_path):
"""
Initialize the sparse image.
:param image_path: img file path
:param map_path: map file path
"""
self.image_flag = True
self.image_path = image_path
self.offset_value_list = []
self.care_block_range = None
self.extended_range = None
self.reserved_blocks = BlocksManager("0")
self.file_map = []
self.offset_index = []
self.block_size = None
self.total_blocks = None
self.parse_sparse_image_file(image_path, map_path)
def parse_sparse_image_file(self, image_path, map_path):
"""
Parse the .img file.
:param image_path: img file path
:param map_path: map file path
"""
with open(image_path, 'rb') as f_r:
image_content = f_r.read(HEADER_INFO_LEN)
header_info = struct.unpack(HEADER_INFO_FORMAT, image_content)
block_size, chunk_header_info_size, header_info_size, magic_info, \
total_blocks, total_chunks, self.image_flag = \
self.image_header_info_check(header_info)
self.block_size = block_size
self.total_blocks = total_blocks
if self.image_flag is False:
UPDATE_LOGGER.print_log(
"This image is not a sparse image! path: %s" % image_path,
UPDATE_LOGGER.ERROR_LOG)
return
UPDATE_LOGGER.print_log("Sparse head info parsing completed!")
pos_value = 0
care_value_list, offset_value_list = [], []
for _ in range(total_chunks):
chunk_info_content = f_r.read(CHUNK_INFO_LEN)
chunk_info = struct.unpack(
CHUNK_INFO_FORMAT, chunk_info_content)
pos_value = self.parse_chunk_info(
block_size, care_value_list, chunk_info, f_r,
offset_value_list, pos_value)
if pos_value is False:
raise RuntimeError
self.care_block_range = BlocksManager(care_value_list)
self.offset_index = [i[0] for i in offset_value_list]
self.offset_value_list = offset_value_list
extended_range = \
self.care_block_range.extend_value_to_blocks(EXTEND_VALUE)
all_blocks = BlocksManager(range_data=(0, total_blocks))
self.extended_range = \
extended_range.get_intersect_with_other(all_blocks).\
get_subtract_with_other(self.care_block_range)
self.parse_block_map_file(map_path, f_r)
@staticmethod
def parse_chunk_info(*args):
"""
Parse the chunk information.
:return pos_value: pos
"""
block_size, care_value_list, chunk_info, f_r,\
offset_value_list, pos_value = args
chunk_type = chunk_info[0]
# Chunk quantity
chunk_size = chunk_info[2]
total_size = chunk_info[3]
data_size = total_size - 12
# Chunk type, which can be CHUNK_TYPE_RAW, CHUNK_TYPE_FILL,
# CHUNK_TYPE_DONT_CARE, or CHUNK_TYPE_CRC32.
if chunk_type == CHUNK_TYPE_RAW:
if data_size != chunk_size * block_size:
UPDATE_LOGGER.print_log(
"chunk_size * block_size: %u and "
"data size: %u is not equal!" %
(data_size, chunk_size * block_size),
UPDATE_LOGGER.ERROR_LOG)
return False
else:
temp_value = pos_value + chunk_size
care_value_list.append(pos_value)
care_value_list.append(temp_value)
offset_value_list.append(
(pos_value, chunk_size, f_r.tell(), None))
pos_value = temp_value
f_r.seek(data_size, os.SEEK_CUR)
elif chunk_type == CHUNK_TYPE_FILL:
temp_value = pos_value + chunk_size
fill_data = f_r.read(4)
care_value_list.append(pos_value)
care_value_list.append(temp_value)
offset_value_list.append((pos_value, chunk_size, None, fill_data))
pos_value = temp_value
elif chunk_type == CHUNK_TYPE_DONT_CARE:
if data_size != 0:
UPDATE_LOGGER.print_log(
"CHUNK_TYPE_DONT_CARE chunk data_size"
" must be 0, data size: (%u)" %
data_size, UPDATE_LOGGER.ERROR_LOG)
return False
else:
pos_value += chunk_size
elif chunk_type == CHUNK_TYPE_CRC32:
UPDATE_LOGGER.print_log(
"Not supported chunk type CHUNK_TYPE_CRC32!",
UPDATE_LOGGER.ERROR_LOG)
return False
else:
UPDATE_LOGGER.print_log(
"Not supported chunk type 0x%04X !" %
chunk_type, UPDATE_LOGGER.ERROR_LOG)
return False
return pos_value
def parse_block_map_file(self, map_path, image_file_r):
"""
Parses the map file for blocks where files are contained in the image.
:param map_path: map file path
:param image_file_r: file reading object
:return:
"""
remain_range = self.care_block_range
temp_file_map = {}
with open(map_path, 'r') as f_r:
# Read the .map file and process each line.
for each_line in f_r.readlines():
each_map_path, ranges_value = each_line.split(None, 1)
each_range = BlocksManager(ranges_value)
temp_file_map[each_map_path] = each_range
# each_range is contained in the remain range.
if each_range.size() != each_range.\
get_intersect_with_other(remain_range).size():
raise RuntimeError
# After the processing is complete,
# remove each_range from remain_range.
remain_range = remain_range.get_subtract_with_other(each_range)
reserved_blocks = self.reserved_blocks
# Remove reserved blocks from all blocks.
remain_range = remain_range.get_subtract_with_other(reserved_blocks)
# Divide all blocks into zero_blocks
# (if there are many) and nonzero_blocks.
zero_blocks_list = []
nonzero_blocks_list = []
nonzero_groups_list = []
default_zero_block = ('\0' * self.block_size).encode()
nonzero_blocks_list, nonzero_groups_list, zero_blocks_list = \
self.apply_remain_range(
default_zero_block, image_file_r, nonzero_blocks_list,
nonzero_groups_list, remain_range, zero_blocks_list)
temp_file_map = self.get_file_map(
nonzero_blocks_list, nonzero_groups_list,
reserved_blocks, temp_file_map, zero_blocks_list)
self.file_map = temp_file_map
def apply_remain_range(self, *args):
"""
Implement traversal processing of remain_range.
"""
default_zero_block, image_file_r,\
nonzero_blocks_list, nonzero_groups_list,\
remain_range, zero_blocks_list = args
for start_value, end_value in remain_range:
for each_value in range(start_value, end_value):
# bisect 二分查找b在self.offset_index中的位置
idx = bisect.bisect_right(self.offset_index, each_value) - 1
chunk_start, _, file_pos, fill_data = \
self.offset_value_list[idx]
data = self.get_file_data(self.block_size, chunk_start,
default_zero_block, each_value,
file_pos, fill_data, image_file_r)
zero_blocks_list, nonzero_blocks_list, nonzero_groups_list = \
self.get_zero_nonzero_blocks_list(
data, default_zero_block, each_value,
nonzero_blocks_list, nonzero_groups_list,
zero_blocks_list)
return nonzero_blocks_list, nonzero_groups_list, zero_blocks_list
@staticmethod
def get_file_map(*args):
"""
Obtain the file map.
nonzero_blocks_list nonzero blocks list,
nonzero_groups_list nonzero groups list,
reserved_blocks reserved blocks ,
temp_file_map temporary file map,
zero_blocks_list zero block list
:return temp_file_map file map
"""
nonzero_blocks_list, nonzero_groups_list,\
reserved_blocks, temp_file_map, zero_blocks_list = args
if nonzero_blocks_list:
nonzero_groups_list.append(nonzero_blocks_list)
if zero_blocks_list:
temp_file_map[FILE_MAP_ZERO_KEY] = \
BlocksManager(range_data=zero_blocks_list)
if nonzero_groups_list:
for i, blocks in enumerate(nonzero_groups_list):
temp_file_map["%s-%d" % (FILE_MAP_NONZERO_KEY, i)] = \
BlocksManager(range_data=blocks)
if reserved_blocks:
temp_file_map[FILE_MAP_COPY_KEY] = reserved_blocks
return temp_file_map
@staticmethod
def get_zero_nonzero_blocks_list(*args):
"""
Get zero_blocks_list, nonzero_blocks_list, and nonzero_groups_list.
data: block data,
default_zero_block: default to zero block,
each_value: each value,
nonzero_blocks_list: nonzero_blocks_list,
nonzero_groups_list: nonzero_groups_list,
zero_blocks_list: zero_blocks_list,
:return new_zero_blocks_list: new zero blocks list,
:return new_nonzero_blocks_list: new nonzero blocks list,
:return new_nonzero_groups_list: new nonzero groups list.
"""
data, default_zero_block, each_value,\
nonzero_blocks_list, nonzero_groups_list,\
zero_blocks_list = args
# Check whether the data block is equal to the default zero_blocks.
if data == default_zero_block:
zero_blocks_list.append(each_value)
zero_blocks_list.append(each_value + 1)
else:
nonzero_blocks_list.append(each_value)
nonzero_blocks_list.append(each_value + 1)
# The number of nonzero_blocks is greater than
# or equal to the upper limit.
if len(nonzero_blocks_list) >= MAX_BLOCKS_PER_GROUP:
nonzero_groups_list.append(nonzero_blocks_list)
nonzero_blocks_list = []
new_zero_blocks_list, new_nonzero_blocks_list, \
new_nonzero_groups_list = copy.copy(zero_blocks_list), \
copy.copy(nonzero_blocks_list), \
copy.copy(nonzero_groups_list)
return new_zero_blocks_list, new_nonzero_blocks_list, \
new_nonzero_groups_list
@staticmethod
def get_file_data(*args):
"""
Get the file data.
block_size: blocksize,
chunk_start: the start position of chunk,
default_zero_block: default to zero blocks,
each_value: each_value,
file_pos: file position,
fill_data: data,
image_file_r: read file object,
:return data: Get the file data.
"""
block_size, chunk_start, default_zero_block, each_value,\
file_pos, fill_data, image_file_r = args
if file_pos is not None:
file_pos += (each_value - chunk_start) * block_size
image_file_r.seek(file_pos, os.SEEK_SET)
data = image_file_r.read(block_size)
else:
if fill_data == default_zero_block[:4]:
data = default_zero_block
else:
data = None
return data
def range_sha256(self, ranges):
hash_obj = sha256()
for data in self.__get_blocks_set_data(ranges):
hash_obj.update(data)
return hash_obj.hexdigest()
def write_range_data_2_fd(self, ranges, file_obj):
for data in self.__get_blocks_set_data(ranges):
file_obj.write(data)
def get_ranges(self, ranges):
return [each_data for each_data in self.__get_blocks_set_data(ranges)]
def __get_blocks_set_data(self, blocks_set_data):
"""
Get the range data.
"""
with open(self.image_path, 'rb') as f_r:
for start, end in blocks_set_data:
diff_value = end - start
idx = bisect.bisect_right(self.offset_index, start) - 1
chunk_start, chunk_len, file_pos, fill_data = \
self.offset_value_list[idx]
remain = chunk_len - (start - chunk_start)
this_read = min(remain, diff_value)
if file_pos is not None:
pos = file_pos + ((start - chunk_start) * self.block_size)
f_r.seek(pos, os.SEEK_SET)
yield f_r.read(this_read * self.block_size)
else:
yield fill_data * (this_read * (self.block_size >> 2))
diff_value -= this_read
while diff_value > 0:
idx += 1
chunk_start, chunk_len, file_pos, fill_data = \
self.offset_value_list[idx]
this_read = min(chunk_len, diff_value)
if file_pos is not None:
f_r.seek(file_pos, os.SEEK_SET)
yield f_r.read(this_read * self.block_size)
else:
yield fill_data * (this_read * (self.block_size >> 2))
diff_value -= this_read
@staticmethod
def image_header_info_check(header_info):
"""
Check for new messages of the header_info image.
:param header_info: header_info
:return:
"""
image_flag = True
# Sparse mirroring header ID. The magic value is fixed to 0xED26FF3A.
magic_info = header_info[0]
# major version number
major_version = header_info[1]
# minor version number
minor_version = header_info[2]
# Length of the header information.
# The value is fixed to 28 characters.
header_info_size = header_info[3]
# Header information size of the chunk.
# The length is fixed to 12 characters.
chunk_header_info_size = header_info[4]
# Number of bytes of a block. The default size is 4096.
block_size = header_info[5]
# Total number of blocks contained in the current image
# (number of blocks in a non-sparse image)
total_blocks = header_info[6]
# Total number of chunks contained in the current image
total_chunks = header_info[7]
if magic_info != SPARSE_IMAGE_MAGIC:
UPDATE_LOGGER.print_log(
"SparseImage head Magic should be 0xED26FF3A!",
UPDATE_LOGGER.WARNING_LOG)
image_flag = False
if major_version != 1 or minor_version != 0:
UPDATE_LOGGER.print_log(
"SparseImage Only supported major version with "
"minor version 1.0!",
UPDATE_LOGGER.WARNING_LOG)
image_flag = False
if header_info_size != 28:
UPDATE_LOGGER.print_log(
"SparseImage header info size must be 28! size: %u." %
header_info_size, UPDATE_LOGGER.WARNING_LOG)
image_flag = False
if chunk_header_info_size != 12:
UPDATE_LOGGER.print_log(
"SparseImage Chunk header size mast to be 12! size: %u." %
chunk_header_info_size, UPDATE_LOGGER.WARNING_LOG)
image_flag = False
return block_size, chunk_header_info_size, header_info_size, \
magic_info, total_blocks, total_chunks, image_flag

BIN
lib/diff Executable file

Binary file not shown.

BIN
lib/libpackage.so (Stored with Git LFS) Executable file

Binary file not shown.

114
log_exception.py Normal file
View File

@ -0,0 +1,114 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
class UpdateToolLogger:
"""
Global log class
"""
INFO_LOG = 'INFO_LOG'
WARNING_LOG = 'WARNING_LOG'
ERROR_LOG = 'ERROR_LOG'
LOG_TYPE = (INFO_LOG, WARNING_LOG, ERROR_LOG)
def __init__(self, output_type='console'):
self.__logger_obj = self.__get_logger_obj(output_type=output_type)
@staticmethod
def __get_logger_obj(output_type='console'):
ota_logger = logging.getLogger(__name__)
ota_logger.setLevel(level=logging.INFO)
formatter = logging.Formatter(
'%(asctime)s %(levelname)s : %(message)s',
"%Y-%m-%d %H:%M:%S")
if output_type == 'console':
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
ota_logger.addHandler(console_handler)
elif output_type == 'file':
file_handler = logging.FileHandler("UpdateToolLog.txt")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
ota_logger.addHandler(file_handler)
return ota_logger
def print_log(self, msg, log_type=INFO_LOG):
"""
Print log information.
:param msg: log information
:param log_type: log type
:return:
"""
if log_type == self.LOG_TYPE[0]:
self.__logger_obj.info(msg)
elif log_type == self.LOG_TYPE[1]:
self.__logger_obj.warning(msg)
elif log_type == self.LOG_TYPE[2]:
self.__logger_obj.error(msg)
else:
self.__logger_obj.error("Unknown log type! %s", log_type)
return False
return True
def print_uncaught_exception_msg(self, msg, exc_info):
"""
Print log when an uncaught exception occurs.
:param msg: Uncaught exception
:param exc_info: information about the uncaught exception
"""
self.__logger_obj.error(msg, exc_info=exc_info)
UPDATE_LOGGER = UpdateToolLogger()
def handle_exception(exc_type, exc_value, exc_traceback):
"""
Override global caught exceptions.
:param exc_type: exception type
:param exc_value: exception value
:param exc_traceback: exception traceback
:return:
"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
UPDATE_LOGGER.print_uncaught_exception_msg(
"Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
from utils import clear_resource
clear_resource(err_clear=True)
sys.excepthook = handle_exception
class VendorExpandError(OSError):
"""
Vendor extended exception class.
Script interfaces are not completely overriden.
"""
def __init__(self, script_class, func_name):
super().__init__()
self.script_class = script_class
self.func_name = func_name
def __str__(self, ):
return ('%s Vendor expansion does not override function %s' %
(self.script_class, self.func_name))

572
patch_package_process.py Normal file
View File

@ -0,0 +1,572 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import subprocess
import tempfile
from ctypes import pointer
from log_exception import UPDATE_LOGGER
from blocks_manager import BlocksManager
from transfers_manager import ActionType
from update_package import PkgHeader
from update_package import PkgComponent
from utils import OPTIONS_MANAGER
from utils import DIFF_EXE_PATH
from utils import get_lib_api
NEW_DAT = "new.dat"
PATCH_DAT = "patch.dat"
TRANSFER_LIST = "transfer.list"
class PatchProcess:
def __init__(self, partition, tgt_sparse_image, src_sparse_image,
actions_list):
self.actions_list = actions_list
self.worker_threads = multiprocessing.cpu_count() // 2
self.partition = partition
self.tgt_sparse_img_obj, self.src_sparse_img_obj = \
tgt_sparse_image, src_sparse_image
self.version = 1
self.touched_src_ranges = BlocksManager()
self.touched_src_sha256 = None
self.package_patch_zip = PackagePatchZip(partition)
def patch_process(self):
"""
Generate patches through calculation.
"""
UPDATE_LOGGER.print_log("Patch Process!")
new_dat_file_obj, patch_dat_file_obj, transfer_list_file_obj = \
self.package_patch_zip.get_file_obj()
stashes = {}
total_blocks_count = 0
stashed_blocks = 0
max_stashed_blocks = 0
transfer_content = ["%d\n" % self.version, "TOTAL_MARK\n",
"0\n", "MAX_STASH_MARK\n"]
diff_offset = 0
for each_action in self.actions_list:
max_stashed_blocks, stashed_blocks = self.add_stash_command(
each_action, max_stashed_blocks, stashed_blocks, stashes,
transfer_content)
free_commands_list, free_size, src_str_list = \
self.add_free_command(each_action, stashes)
src_str = " ".join(src_str_list)
tgt_size = each_action.tgt_block_set.size()
if each_action.type_str == ActionType.ZERO:
total_blocks_count = \
self.apply_zero_type(each_action, total_blocks_count,
transfer_content)
elif each_action.type_str == ActionType.NEW:
total_blocks_count = \
self.apply_new_type(each_action, new_dat_file_obj,
tgt_size, total_blocks_count,
transfer_content)
elif each_action.type_str == ActionType.DIFFERENT:
max_stashed_blocks, total_blocks_count, diff_offset = \
self.apply_diff_style(
diff_offset, each_action, max_stashed_blocks,
patch_dat_file_obj, src_str, stashed_blocks, tgt_size,
total_blocks_count, transfer_content)
else:
UPDATE_LOGGER.print_log("Unknown action type: %s!" %
each_action.type_str)
raise RuntimeError
if free_commands_list:
transfer_content.append("".join(free_commands_list))
stashed_blocks -= free_size
self.after_for_process(max_stashed_blocks, total_blocks_count,
transfer_content, transfer_list_file_obj)
def apply_new_type(self, each_action, new_dat_file_obj, tgt_size,
total_blocks_count, transfer_content):
self.tgt_sparse_img_obj.write_range_data_2_fd(
each_action.tgt_block_set, new_dat_file_obj)
UPDATE_LOGGER.print_log("%7s %s %s" % (
each_action.type_str, each_action.tgt_name,
str(each_action.tgt_block_set)))
temp_size = self.write_split_transfers(
transfer_content,
each_action.type_str, each_action.tgt_block_set)
if tgt_size != temp_size:
raise RuntimeError
total_blocks_count += temp_size
return total_blocks_count
def apply_zero_type(self, each_action, total_blocks_count,
transfer_content):
UPDATE_LOGGER.print_log("%7s %s %s" % (
each_action.type_str, each_action.tgt_name,
str(each_action.tgt_block_set)))
to_zero = \
each_action.tgt_block_set.get_subtract_with_other(
each_action.src_block_set)
if self.write_split_transfers(transfer_content, each_action.type_str,
to_zero) != to_zero.size():
raise RuntimeError
total_blocks_count += to_zero.size()
return total_blocks_count
def apply_diff_style(self, *args):
"""
Process actions of the diff type.
"""
diff_offset, each_action, max_stashed_blocks,\
patch_dat_file_obj, src_str, stashed_blocks, tgt_size,\
total_blocks_count, transfer_content = args
if self.tgt_sparse_img_obj. \
range_sha256(each_action.tgt_block_set) == \
self.src_sparse_img_obj.\
range_sha256(each_action.src_block_set):
each_action.type_str = ActionType.MOVE
UPDATE_LOGGER.print_log("%7s %s %s (from %s %s)" % (
each_action.type_str, each_action.tgt_name,
str(each_action.tgt_block_set),
each_action.src_name,
str(each_action.src_block_set)))
max_stashed_blocks, total_blocks_count = \
self.add_move_command(
each_action, max_stashed_blocks, src_str,
stashed_blocks, tgt_size, total_blocks_count,
transfer_content)
else:
do_img_diff, patch_value = self.compute_diff_patch(
each_action, patch_dat_file_obj)
if each_action.src_block_set.is_overlaps(
each_action.tgt_block_set):
temp_stash_usage = \
stashed_blocks + each_action.src_block_set.size()
if temp_stash_usage > max_stashed_blocks:
max_stashed_blocks = temp_stash_usage
self.add_diff_command(diff_offset, do_img_diff,
each_action, patch_value, src_str,
transfer_content)
diff_offset += len(patch_value)
total_blocks_count += tgt_size
return max_stashed_blocks, total_blocks_count, diff_offset
def after_for_process(self, max_stashed_blocks, total_blocks_count,
transfer_content, transfer_list_file_obj):
"""
Implement processing after cyclical actions_list processing.
:param max_stashed_blocks: maximum number of stashed blocks in actions
:param total_blocks_count: total number of blocks
:param transfer_content: transfer content
:param transfer_list_file_obj: transfer file object
:return:
"""
self.touched_src_sha256 = self.src_sparse_img_obj.range_sha256(
self.touched_src_ranges)
if self.tgt_sparse_img_obj.extended_range:
if self.write_split_transfers(
transfer_content, ActionType.ZERO,
self.tgt_sparse_img_obj.extended_range) != \
self.tgt_sparse_img_obj.extended_range.size():
raise RuntimeError
total_blocks_count += self.tgt_sparse_img_obj.extended_range.size()
all_tgt = BlocksManager(
range_data=(0, self.tgt_sparse_img_obj.total_blocks))
all_tgt_minus_extended = all_tgt.get_subtract_with_other(
self.tgt_sparse_img_obj.extended_range)
new_not_care = all_tgt_minus_extended.get_subtract_with_other(
self.tgt_sparse_img_obj.care_block_range)
self.add_erase_content(new_not_care, transfer_content)
transfer_content = self.get_transfer_content(
max_stashed_blocks, total_blocks_count, transfer_content)
transfer_list_file_obj.write(transfer_content.encode())
@staticmethod
def get_transfer_content(max_stashed_blocks, total_blocks_count,
transfer_content):
"""
Get the tranfer content.
"""
transfer_content = ''.join(transfer_content)
transfer_content = \
transfer_content.replace("TOTAL_MARK", str(total_blocks_count))
transfer_content = \
transfer_content.replace("MAX_STASH_MARK", str(max_stashed_blocks))
transfer_content = \
transfer_content.replace("ActionType.MOVE", "move")
transfer_content = \
transfer_content.replace("ActionType.ZERO", "zero")
transfer_content = \
transfer_content.replace("ActionType.NEW", "new")
return transfer_content
def add_diff_command(self, *args):
"""
Add the diff command.
"""
diff_offset, do_img_diff, each_action,\
patch_value, src_str, transfer_content = args
self.touched_src_ranges = self.touched_src_ranges.get_union_with_other(
each_action.src_block_set)
diff_type = "imgdiff" if do_img_diff else "bsdiff"
transfer_content.append("%s %d %d %s %s %s %s\n" % (
diff_type,
diff_offset, len(patch_value),
self.src_sparse_img_obj.range_sha256(each_action.src_block_set),
self.tgt_sparse_img_obj.range_sha256(each_action.tgt_block_set),
each_action.tgt_block_set.to_string_raw(), src_str))
def compute_diff_patch(self, each_action, patch_dat_file_obj):
"""
Run the command to calculate the differential patch.
"""
src_file_obj = \
tempfile.NamedTemporaryFile(prefix="src-", mode='wb')
self.src_sparse_img_obj.write_range_data_2_fd(
each_action.src_block_set, src_file_obj)
src_file_obj.seek(0)
tgt_file_obj = tempfile.NamedTemporaryFile(
prefix="tgt-", mode='wb')
self.tgt_sparse_img_obj.write_range_data_2_fd(
each_action.tgt_block_set, tgt_file_obj)
tgt_file_obj.seek(0)
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
src_file_obj)
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
tgt_file_obj)
do_img_diff = True if \
each_action.tgt_name.split(".")[-1].lower() in \
("zip", "gz", "lz4", "hap") else False
try:
patch_value, do_img_diff = self.apply_compute_patch(
src_file_obj.name, tgt_file_obj.name, do_img_diff)
except ValueError:
UPDATE_LOGGER.print_log("Patch process Failed!")
UPDATE_LOGGER.print_log("%7s %s %s (from %s %s)" % (
each_action.type_str, each_action.tgt_name,
str(each_action.tgt_block_set),
each_action.src_name,
str(each_action.src_block_set)),
UPDATE_LOGGER.ERROR_LOG)
raise ValueError
patch_dat_file_obj.write(patch_value)
return do_img_diff, patch_value
def add_move_command(self, *args):
"""
Add the move command.
"""
each_action, max_stashed_blocks, src_str,\
stashed_blocks, tgt_size, total_blocks_count,\
transfer_content = args
src_block_set = each_action.src_block_set
tgt_block_set = each_action.tgt_block_set
if src_block_set != tgt_block_set:
if src_block_set.is_overlaps(tgt_block_set):
temp_stash_usage = stashed_blocks + \
src_block_set.size()
if temp_stash_usage > max_stashed_blocks:
max_stashed_blocks = temp_stash_usage
self.touched_src_ranges = \
self.touched_src_ranges.get_union_with_other(src_block_set)
transfer_content.append(
"{type_str} {tgt_hash} {tgt_string} {src_str}\n".
format(type_str=each_action.type_str,
tgt_hash=self.tgt_sparse_img_obj.
range_sha256(each_action.tgt_block_set),
tgt_string=tgt_block_set.to_string_raw(),
src_str=src_str))
total_blocks_count += tgt_size
return max_stashed_blocks, total_blocks_count
def add_free_command(self, each_action, stashes):
"""
Add the free command.
:param each_action: action object to be processed
:param stashes: Stash dict
:return: free_commands_list, free_size, src_str_list
"""
free_commands_list = []
free_size = 0
src_blocks_size = each_action.src_block_set.size()
src_str_list = [str(src_blocks_size)]
un_stashed_src_ranges = each_action.src_block_set
mapped_stashes = []
for _, each_stash_before in each_action.use_stash:
un_stashed_src_ranges = \
un_stashed_src_ranges.get_subtract_with_other(
each_stash_before)
src_range_sha = \
self.src_sparse_img_obj.range_sha256(each_stash_before)
each_stash_before = \
each_action.src_block_set.get_map_within(each_stash_before)
mapped_stashes.append(each_stash_before)
if src_range_sha not in stashes:
raise RuntimeError
src_str_list.append(
"%s:%s" % (src_range_sha, each_stash_before.to_string_raw()))
stashes[src_range_sha] -= 1
if stashes[src_range_sha] == 0:
free_commands_list.append("free %s\n" % (src_range_sha,))
free_size += each_stash_before.size()
stashes.pop(src_range_sha)
self.apply_stashed_range(each_action, mapped_stashes, src_blocks_size,
src_str_list, un_stashed_src_ranges)
return free_commands_list, free_size, src_str_list
def apply_stashed_range(self, *args):
each_action, mapped_stashes, src_blocks_size,\
src_str_list, un_stashed_src_ranges = args
if un_stashed_src_ranges.size() != 0:
src_str_list.insert(1, un_stashed_src_ranges.to_string_raw())
if each_action.use_stash:
mapped_un_stashed = each_action.src_block_set.get_map_within(
un_stashed_src_ranges)
src_str_list.insert(2, mapped_un_stashed.to_string_raw())
mapped_stashes.append(mapped_un_stashed)
self.check_partition(
BlocksManager(range_data=(0, src_blocks_size)),
mapped_stashes)
else:
src_str_list.insert(1, "-")
self.check_partition(
BlocksManager(range_data=(0, src_blocks_size)), mapped_stashes)
def add_stash_command(self, each_action, max_stashed_blocks,
stashed_blocks, stashes, transfer_content):
"""
Add the stash command.
:param each_action: action object to be processed
:param max_stashed_blocks: number of max stash blocks in all actions
:param stashed_blocks: number of stash blocks
:param stashes: Stash dict
:param transfer_content: transfer content list
:return: max_stashed_blocks, stashed_blocks
"""
for _, each_stash_before in each_action.stash_before:
src_range_sha = \
self.src_sparse_img_obj.range_sha256(each_stash_before)
if src_range_sha in stashes:
stashes[src_range_sha] += 1
else:
stashes[src_range_sha] = 1
stashed_blocks += each_stash_before.size()
self.touched_src_ranges = \
self.touched_src_ranges.\
get_union_with_other(each_stash_before)
transfer_content.append("stash %s %s\n" % (
src_range_sha, each_stash_before.to_string_raw()))
if stashed_blocks > max_stashed_blocks:
max_stashed_blocks = stashed_blocks
return max_stashed_blocks, stashed_blocks
def write_script(self, partition, script_check_cmd_list,
script_write_cmd_list, verse_script):
"""
Add command content to the script.
:param partition: image name
:param script_check_cmd_list: incremental check command list
:param script_write_cmd_list: incremental write command list
:param verse_script: verse script object
:return:
"""
ranges_str = self.touched_src_ranges.to_string_raw()
expected_sha = self.touched_src_sha256
sha_check_cmd = verse_script.sha_check(
ranges_str, expected_sha, partition)
first_block_check_cmd = verse_script.first_block_check(partition)
abort_cmd = verse_script.abort(partition)
cmd = 'if ({sha_check_cmd} != 0 || ' \
'{first_block_check_cmd} != 0)' \
'{{\n {abort_cmd}}}\n'.format(
sha_check_cmd=sha_check_cmd,
first_block_check_cmd=first_block_check_cmd,
abort_cmd=abort_cmd)
script_check_cmd_list.append(cmd)
block_update_cmd = verse_script.block_update(partition)
cmd = '%s_WRITE_FLAG%s' % (partition, block_update_cmd)
script_write_cmd_list.append(cmd)
def add_erase_content(self, new_not_care, transfer_content):
"""
Add the erase command.
:param new_not_care: blocks that don't need to be cared about
:param transfer_content: transfer content list
:return:
"""
erase_first = new_not_care.\
get_subtract_with_other(self.touched_src_ranges)
if erase_first.size() != 0:
transfer_content.insert(
4, "erase %s\n" % (erase_first.to_string_raw(),))
erase_last = new_not_care.get_subtract_with_other(erase_first)
if erase_last.size() != 0:
transfer_content.append(
"erase %s\n" % (erase_last.to_string_raw(),))
@staticmethod
def check_partition(total, seq):
so_far = BlocksManager()
for i in seq:
if so_far.is_overlaps(i):
raise RuntimeError
so_far = so_far.get_union_with_other(i)
if so_far != total:
raise RuntimeError
@staticmethod
def write_split_transfers(transfer_content, type_str, target_blocks):
"""
Limit the size of operand in command 'new' and 'zero' to 1024 blocks.
:param transfer_content: transfer content list
:param type_str: type of the action to be processed.
:param target_blocks: BlocksManager of the target blocks
:return: total
"""
if type_str not in (ActionType.NEW, ActionType.ZERO):
raise RuntimeError
blocks_limit = 1024
total = 0
while target_blocks.size() != 0:
blocks_to_write = target_blocks.get_first_block_obj(blocks_limit)
transfer_content.append(
"%s %s\n" % (type_str, blocks_to_write.to_string_raw()))
total += blocks_to_write.size()
target_blocks = \
target_blocks.get_subtract_with_other(blocks_to_write)
return total
@staticmethod
def apply_compute_patch(src_file, tgt_file, imgdiff=False):
"""
Add command content to the script.
:param src_file: source file name
:param tgt_file: target file name
:param imgdiff: whether to execute imgdiff judgment
:return:
"""
patch_file_obj = \
tempfile.NamedTemporaryFile(prefix="patch-", mode='wb')
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
patch_file_obj)
cmd = [DIFF_EXE_PATH] if imgdiff else [DIFF_EXE_PATH, '-b', '1']
cmd.extend(['-s', src_file, '-d', tgt_file, '-p', patch_file_obj.name])
sub_p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = sub_p.communicate()
if sub_p.returncode != 0:
raise ValueError(output)
with open(patch_file_obj.name, 'rb') as file_read:
patch_content = file_read.read()
return patch_content, imgdiff
class PackagePatchZip:
"""
Compress the patch file generated by the
differential calculation as *.zip file.
"""
def __init__(self, partition):
self.partition_new_dat_file_name = "%s.%s" % (partition, NEW_DAT)
self.partition_patch_dat_file_name = "%s.%s" % (partition, PATCH_DAT)
self.partition_transfer_file_name = "%s.%s" % (
partition, TRANSFER_LIST)
self.new_dat_file_obj = tempfile.NamedTemporaryFile(
prefix="%s-" % NEW_DAT, mode='wb')
self.patch_dat_file_obj = tempfile.NamedTemporaryFile(
prefix="%s-" % PATCH_DAT, mode='wb')
self.transfer_list_file_obj = tempfile.NamedTemporaryFile(
prefix="%s-" % TRANSFER_LIST, mode='wb')
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
self.new_dat_file_obj)
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
self.patch_dat_file_obj)
OPTIONS_MANAGER.incremental_temp_file_obj_list.append(
self.transfer_list_file_obj)
self.partition_file_obj = \
tempfile.NamedTemporaryFile(prefix="partition_patch-")
self.partition_head_list = PkgHeader()
pkg_components = PkgComponent * 3
self.partition_component_list = pkg_components()
OPTIONS_MANAGER.incremental_image_file_obj_list.append(
self.partition_file_obj)
self.set_package_file_args()
def get_file_obj(self):
"""
Obtain file objects.
"""
return self.new_dat_file_obj, self.patch_dat_file_obj, \
self.transfer_list_file_obj
def set_package_file_args(self):
"""
Set Diff patch calculation and packaging parameters.
"""
self.partition_head_list.digest_method = 0
self.partition_head_list.sign_method = 0
self.partition_head_list.pkg_type = 2
self.partition_head_list.entry_count = 3
self.partition_component_list[0].file_path = \
self.new_dat_file_obj.name.encode("utf-8")
self.partition_component_list[0].component_addr = \
self.partition_new_dat_file_name.encode("utf-8")
self.partition_component_list[1].file_path = \
self.patch_dat_file_obj.name.encode("utf-8")
self.partition_component_list[1].component_addr = \
self.partition_patch_dat_file_name.encode("utf-8")
def package_patch_zip(self):
"""
Compress the partition diff patch calculation data as *.zip package.
"""
self.partition_file_obj.seek(0)
self.patch_dat_file_obj.seek(0)
self.new_dat_file_obj.seek(0)
self.transfer_list_file_obj.seek(0)
self.partition_component_list[2].file_path = \
self.transfer_list_file_obj.name.encode("utf-8")
self.partition_component_list[2].component_addr = \
self.partition_transfer_file_name.encode("utf-8")
lib = get_lib_api()
lib.CreatePackage(pointer(self.partition_head_list),
self.partition_component_list,
self.partition_file_obj.name.encode("utf-8"),
OPTIONS_MANAGER.private_key.encode("utf-8"))

412
script_generator.py Normal file
View File

@ -0,0 +1,412 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description : Create script file for updater
"""
import os
import re
import tempfile
from decimal import getcontext
from decimal import Decimal
from log_exception import VendorExpandError
from log_exception import UPDATE_LOGGER
from utils import OPTIONS_MANAGER
from utils import PARTITION_FILE
from utils import TWO_STEP
from utils import TOTAL_SCRIPT_FILE_NAME
from utils import SCRIPT_FILE_NAME
from utils import SCRIPT_KEY_LIST
class Script:
def __init__(self):
self.script = []
self.version = 0
self.info = {}
def add_command(self, cmd=None):
"""
Add command content to the script.
:param cmd: command content
:return:
"""
self.script.append(cmd)
def get_script(self):
"""
Get the script list.
:return: script list
"""
return self.script
def sha_check(self, *args, **kwargs):
raise VendorExpandError(type(self), 'sha_check')
def first_block_check(self, *args, **kwargs):
raise VendorExpandError(type(self), 'first_block_check')
def abort(self, *args, **kwargs):
raise VendorExpandError(type(self), 'abort')
def show_progress(self, *args, **kwargs):
raise VendorExpandError(type(self), 'show_progress')
def block_update(self, *args, **kwargs):
raise VendorExpandError(type(self), 'block_update')
def sparse_image_write(self, *args, **kwargs):
raise VendorExpandError(type(self), 'sparse_image_write')
def raw_image_write(self, *args, **kwargs):
raise VendorExpandError(type(self), 'raw_image_write')
def get_status(self, *args, **kwargs):
raise VendorExpandError(type(self), 'get_status')
def set_status(self, *args, **kwargs):
raise VendorExpandError(type(self), 'set_status')
def reboot_now(self, *args, **kwargs):
raise VendorExpandError(type(self), 'reboot_now')
def updater_partitions(self, *args, **kwargs):
raise VendorExpandError(type(self), 'updater_partitions')
class PreludeScript(Script):
def __init__(self):
super().__init__()
class VerseScript(Script):
def __init__(self):
super().__init__()
def sha_check(self, ranges_str, expected_sha, partition):
"""
Get the sha_check command.
:param ranges_str: ranges string
:param expected_sha: hash value
:param partition: image name
:return:
"""
cmd = ('sha_check("/{partition}", "{ranges_str}", '
'"{expected_sha}")').format(
ranges_str=ranges_str,
expected_sha=expected_sha, partition=partition)
return cmd
def first_block_check(self, partition):
"""
Get the first_block_check command.
:param partition: image name
:return:
"""
cmd = 'first_block_check("/{partition}")'.format(
partition=partition)
return cmd
def abort(self, partition):
"""
Get the abort command.
:param partition: image name
:return:
"""
cmd = 'abort("ERROR: {partition} partition ' \
'fails to incremental check!");\n'.format(
partition=partition)
return cmd
def show_progress(self, start_progress, dur):
"""
Get the show_progress command.
'dur' may be zero to advance the progress via SetProgress
:param start_progress: start progress
:param dur: seconds
:return:
"""
cmd = 'show_progress({start_progress}, {dur});\n'.format(
start_progress=start_progress, dur=dur)
return cmd
def block_update(self, partition):
"""
Get the block_update command.
:param partition: image name
:return:
"""
cmd = 'block_update("/{partition}", ' \
'"{partition}.transfer.list", "{partition}.new.dat", ' \
'"{partition}.patch.dat");\n'.format(partition=partition)
return cmd
def sparse_image_write(self, partition):
"""
Get the sparse_image_write command.
:param partition: image name
:return:
"""
cmd = 'sparse_image_write("/%s");\n' % partition
return cmd
def raw_image_write(self, partition):
"""
Get the raw_image_write command.
:param partition: image name
:return:
"""
cmd = 'raw_image_write("/%s");\n' % partition
return cmd
def get_status(self):
"""
Get the get_status command.
:return:
"""
cmd = 'get_status("/misc")'
return cmd
def set_status(self, status_value):
"""
Get the set_status command.
:param status_value: status value to be set
:return:
"""
cmd = 'set_status("/misc", %s);\n' % status_value
return cmd
def reboot_now(self):
"""
Get the reboot_now command.
:return:
"""
cmd = 'reboot_now();\n'
return cmd
def updater_partitions(self):
"""
Get the updater_partitions command.
:return:
"""
cmd = 'update_partitions("/%s");\n' % PARTITION_FILE
return cmd
class RefrainScript(Script):
def __init__(self):
super().__init__()
class EndingScript(Script):
def __init__(self):
super().__init__()
def write_script(script_content, opera_name):
"""
Generate the {opera}script.
:param script_content: script content
:param opera_name: Opera phase names corresponding to the script content
'prelude', 'verse', 'refrain', and 'ending'.
:return:
"""
script_file = tempfile.NamedTemporaryFile(mode='w+')
script_file.write(script_content)
script_file.seek(0)
script_file_name = ''.join([opera_name.title(), SCRIPT_FILE_NAME])
OPTIONS_MANAGER.opera_script_file_name_dict[opera_name].\
append((script_file_name, script_file))
UPDATE_LOGGER.print_log("%s generation complete!" % script_file_name)
def generate_total_script():
"""
Generate the overall script.
"""
content_list = []
for each_key, each_value in \
OPTIONS_MANAGER.opera_script_file_name_dict.items():
for each in each_value:
each_content = "LoadScript(\"%s\", %s);" % \
(each[0], SCRIPT_KEY_LIST.index(each_key))
content_list.append(each_content)
script_total = tempfile.NamedTemporaryFile(mode='w+')
script_total.write('\n'.join(content_list))
script_total.seek(0)
OPTIONS_MANAGER.total_script_file_obj = script_total
UPDATE_LOGGER.print_log("%s generation complete!" % TOTAL_SCRIPT_FILE_NAME)
def get_progress_value(distributable_value=100):
"""
Allocate a progress value to each image update.
:param distributable_value: distributable value
:return:
"""
progress_value_dict = {}
full_img_list = OPTIONS_MANAGER.full_img_list
incremental_img_list = OPTIONS_MANAGER.incremental_img_list
file_size_list = []
if len(full_img_list) == 0 and len(incremental_img_list) == 0:
UPDATE_LOGGER.print_log(
"get progress value failed! > getting progress value failed!",
UPDATE_LOGGER.ERROR_LOG)
return False
for idx, _ in enumerate(incremental_img_list):
# Obtain the size of the incremental image file.
if OPTIONS_MANAGER.two_step and incremental_img_list[idx] == TWO_STEP:
# Updater images are not involved in progress calculation.
incremental_img_list.remove(TWO_STEP)
continue
file_obj = OPTIONS_MANAGER.incremental_image_file_obj_list[idx]
each_img_size = os.path.getsize(file_obj.name)
file_size_list.append(each_img_size)
for idx, _ in enumerate(full_img_list):
# Obtain the size of the full image file.
if OPTIONS_MANAGER.two_step and full_img_list[idx] == TWO_STEP:
# Updater images are not involved in progress calculation.
continue
file_obj = OPTIONS_MANAGER.full_image_file_obj_list[idx]
each_img_size = os.path.getsize(file_obj.name)
file_size_list.append(each_img_size)
if OPTIONS_MANAGER.two_step and TWO_STEP in full_img_list:
full_img_list.remove(TWO_STEP)
proportion_value_list = get_proportion_value_list(
file_size_list, distributable_value=distributable_value)
adjusted_proportion_value_list = adjust_proportion_value_list(
proportion_value_list, distributable_value)
all_img_list = incremental_img_list + full_img_list
current_progress = 40
for idx, each_img in enumerate(all_img_list):
temp_progress = current_progress + adjusted_proportion_value_list[idx]
progress_value_dict[each_img] = (current_progress, temp_progress)
current_progress = temp_progress
return progress_value_dict
def get_proportion_value_list(file_size_list, distributable_value=100):
"""
Obtain the calculated progress proportion value list
(proportion_value_list).
:param file_size_list: file size list
:param distributable_value: distributable value
:return proportion_value_list: progress proportion value list
"""
sum_size = sum(file_size_list)
getcontext().prec = 2
proportion_value_list = []
for each_size_value in file_size_list:
proportion = Decimal(str(float(each_size_value))) / Decimal(
str(float(sum_size)))
proportion_value = int(
Decimal(str(proportion)) *
Decimal(str(float(distributable_value))))
if proportion_value == 0:
proportion_value = 1
proportion_value_list.append(proportion_value)
return proportion_value_list
def adjust_proportion_value_list(proportion_value_list, distributable_value):
"""
Adjust the calculated progress proportion value list to ensure that
sum is equal to distributable_value.
:param proportion_value_list: calculated progress proportion value list
:param distributable_value: number of distributable progress values
:return proportion_value_list: new progress proportion value list
"""
if len(proportion_value_list) == 0:
return []
sum_proportion_value = sum(proportion_value_list)
if sum_proportion_value > distributable_value:
max_value = max(proportion_value_list)
max_idx = proportion_value_list.index(max_value)
proportion_value_list[max_idx] = \
max_value - (sum_proportion_value - distributable_value)
elif sum_proportion_value < distributable_value:
min_value = min(proportion_value_list)
min_idx = proportion_value_list.index(min_value)
proportion_value_list[min_idx] = \
min_value + (distributable_value - sum_proportion_value)
return proportion_value_list
def create_script(prelude_script, verse_script,
refrain_script, ending_script):
"""
Generate the script file.
:param prelude_script: prelude script
:param verse_script: verse script
:param refrain_script: refrain script
:param ending_script: ending script
:return:
"""
# 生成序幕脚本
prelude_script.add_command("\n# ---- prelude ----\n")
# Get the distribution progress.
progress_value_dict = get_progress_value()
if progress_value_dict is False:
return False
verse_script_content_list = verse_script.get_script()
updater_content = []
if OPTIONS_MANAGER.two_step:
for idx, each_cmd in enumerate(verse_script_content_list[1:]):
if "/%s" % TWO_STEP in each_cmd:
updater_content.append(each_cmd)
each_cmd = \
'\n'.join(
[' %s' % each for each in each_cmd.split('\n')])
verse_script_content_list[0] = \
verse_script_content_list[0].replace(
"UPDATER_WRITE_FLAG",
"%s\nUPDATER_WRITE_FLAG" % each_cmd)
verse_script_content_list[0] = \
verse_script_content_list[0].replace("UPDATER_WRITE_FLAG", "")
verse_script_content_list[0] = \
verse_script_content_list[0].replace("updater_WRITE_FLAG", "")
for each in updater_content:
verse_script_content_list.remove(each)
verse_script_content = '\n'.join(verse_script_content_list[1:])
else:
verse_script_content = '\n'.join(verse_script_content_list)
for key, value in progress_value_dict.items():
show_progress_content = \
verse_script.show_progress((value[1] - value[0]) / 100, 0)
verse_script_content = \
re.sub(r'%s_WRITE_FLAG' % key, '%s' % show_progress_content,
verse_script_content, count=1)
if OPTIONS_MANAGER.two_step:
verse_script_content = '\n'.join(
[' %s' % each for each in verse_script_content.split('\n')])
verse_script_content = verse_script_content_list[0].replace(
"ALL_WRITE_FLAG", verse_script_content)
# Generae the verse script.
write_script(verse_script_content, 'verse')
# Generate the refrain script.
refrain_script.add_command("\n# ---- refrain ----\n")
# Generate the ending script.
ending_script.add_command("\n# ---- ending ----\n")
generate_total_script()

158
transfers_manager.py Normal file
View File

@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description: create actions_list and transfer package
"""
import os
import re
from collections import OrderedDict
from enum import Enum
from blocks_manager import BlocksManager
from log_exception import UPDATE_LOGGER
from utils import FILE_MAP_ZERO_KEY
from utils import FILE_MAP_COPY_KEY
VERSION_NAME_RE = r"[0-9]+"
REPLACE_CONTENT = "#"
class ActionType(Enum):
NEW = 0
ZERO = 1
DIFFERENT = 2
MOVE = 3
class ActionInfo:
def __init__(self, type_str, tgt_name, src_name, tgt_block_set,
src_block_set):
self.type_str = type_str
self.tgt_name = tgt_name
self.src_name = src_name
self.tgt_block_set = tgt_block_set
if src_block_set is not None:
self.src_block_set = src_block_set
else:
self.src_block_set = BlocksManager()
self.child = OrderedDict()
self.parent = OrderedDict()
self.stash_before = []
self.use_stash = []
def get_max_block_number(self):
if self.src_block_set and self.src_block_set.size() != 0:
return max(self.src_block_set.range_data)
else:
return 0
def net_stash_change(self):
return (sum(sr.size() for (_, sr) in self.stash_before) -
sum(sr.size() for (_, sr) in self.use_stash))
class TransfersManager(object):
def __init__(self, partition, tgt_sparse_img_obj, src_sparse_img_obj,
disable_img_diff=False):
self.tgt_sparse_img_obj = tgt_sparse_img_obj
self.src_sparse_img_obj = src_sparse_img_obj
self.partition = partition
self.disable_img_diff = disable_img_diff
self.action_file_list = []
@staticmethod
def simplify_file_name(file_name):
base_name = os.path.basename(file_name)
no_version_name = re.sub(VERSION_NAME_RE, REPLACE_CONTENT, base_name)
return base_name, no_version_name
def arrange_source_file(self):
base_names = {}
version_patterns = {}
for file_name in self.src_sparse_img_obj.file_map.keys():
base_name, no_version_name = self.simplify_file_name(file_name)
base_names[base_name] = file_name
version_patterns[no_version_name] = file_name
return base_names, version_patterns
def find_process_needs(self):
"""
generate action_list
"""
src_base_names, src_version_patterns = self.arrange_source_file()
max_size = -1
for tgt_file_name, tgt_blocks in \
self.tgt_sparse_img_obj.file_map.items():
if FILE_MAP_ZERO_KEY == tgt_file_name:
UPDATE_LOGGER.print_log("Apply ZERO type!")
self.action_file_list.append(
ActionInfo(
ActionType.ZERO, tgt_file_name, FILE_MAP_ZERO_KEY,
tgt_blocks, self.src_sparse_img_obj.
file_map.get(FILE_MAP_ZERO_KEY, None)))
continue
if FILE_MAP_COPY_KEY == tgt_file_name:
UPDATE_LOGGER.print_log("Apply COPY type!")
self.action_file_list.append(
ActionInfo(ActionType.NEW, tgt_file_name,
None, tgt_blocks, None))
continue
if tgt_file_name in self.src_sparse_img_obj.file_map:
UPDATE_LOGGER.print_log("Apply DIFF type!")
action_info = ActionInfo(
ActionType.DIFFERENT, tgt_file_name, tgt_file_name,
tgt_blocks,
self.src_sparse_img_obj.file_map[tgt_file_name])
max_size = action_info.get_max_block_number() \
if action_info.get_max_block_number() > \
max_size else max_size
self.action_file_list.append(action_info)
continue
src_file_name = self.get_file_name(
src_base_names, src_version_patterns, tgt_file_name)
if src_file_name:
action_info = ActionInfo(
ActionType.DIFFERENT, tgt_file_name, src_file_name,
tgt_blocks,
self.src_sparse_img_obj.file_map[src_file_name])
max_size = action_info.get_max_block_number() if \
action_info.get_max_block_number() > max_size else max_size
self.action_file_list.append(action_info)
continue
self.action_file_list.append(
ActionInfo(ActionType.NEW, tgt_file_name,
None, tgt_blocks, None))
return max_size
def get_file_name(self, src_base_names, src_version_patterns,
tgt_file_name):
tgt_base_name, tgt_version_patterns = \
self.simplify_file_name(tgt_file_name)
has_diff_name = True if tgt_base_name in src_base_names else False
has_diff_version = \
True if tgt_version_patterns in src_version_patterns else False
src_file_name = \
src_base_names[tgt_base_name] if has_diff_name else \
src_version_patterns[tgt_version_patterns] if \
has_diff_version else None
return src_file_name
def get_action_list(self):
return self.action_file_list

441
update_package.py Normal file
View File

@ -0,0 +1,441 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import os
import subprocess
import tempfile
import time
from collections import OrderedDict
from ctypes import Structure
from ctypes import c_int
from ctypes import c_char_p
from ctypes import c_ubyte
from ctypes import pointer
from enum import Enum
from log_exception import UPDATE_LOGGER
from script_generator import create_script
from utils import OPTIONS_MANAGER
from utils import ON_SERVER
from utils import SCRIPT_KEY_LIST
from utils import EXTEND_OPTIONAL_COMPONENT_LIST
from utils import COMPONENT_INFO_INNIT
from utils import UPDATE_EXE_FILE_NAME
from utils import TOTAL_SCRIPT_FILE_NAME
from utils import EXTEND_COMPONENT_LIST
from utils import LINUX_HASH_ALGORITHM_DICT
from utils import BUILD_TOOLS_FILE_NAME
from utils import get_lib_api
IS_DEL = 0
class SignMethod(Enum):
RSA = 1
ECC = 2
class PkgHeader(Structure):
_fields_ = [("digest_method", c_ubyte),
("sign_method", c_ubyte),
("pkg_type", c_ubyte),
("entry_count", c_int),
("update_file_version", c_int),
("product_update_id", c_char_p),
("software_version", c_char_p),
("date", c_char_p),
("time", c_char_p)]
class PkgComponent(Structure):
_fields_ = [("digest", c_ubyte * 32),
("file_path", c_char_p),
("component_addr", c_char_p),
("version", c_char_p),
("size", c_int),
("id", c_int),
("original_size", c_int),
("res_type", c_ubyte),
("type", c_ubyte),
("flags", c_ubyte)]
def create_update_bin():
"""
Call the interface to generate the update.bin file.
:return update_bin_obj: Update file object.
If exception occurs, return False.
"""
update_bin_obj = tempfile.NamedTemporaryFile(prefix="update_bin-")
head_value_list = OPTIONS_MANAGER.head_info_list
component_dict = OPTIONS_MANAGER.component_info_dict
full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
full_img_list = OPTIONS_MANAGER.full_img_list
incremental_img_list = OPTIONS_MANAGER.incremental_img_list
incremental_image_file_obj_list = \
OPTIONS_MANAGER.incremental_image_file_obj_list
all_image_file_obj_list = \
incremental_image_file_obj_list + full_image_file_obj_list
if OPTIONS_MANAGER.partition_file_obj is not None:
all_image_name = \
EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST + \
incremental_img_list + full_img_list
else:
all_image_name = \
EXTEND_COMPONENT_LIST + incremental_img_list + full_img_list
sort_component_dict = OrderedDict()
for each_image_name in all_image_name:
sort_component_dict[each_image_name] = \
component_dict.get(each_image_name)
component_dict = copy.deepcopy(sort_component_dict)
head_list = get_head_list(len(component_dict), head_value_list)
component_list = get_component_list(
all_image_file_obj_list, component_dict)
save_patch = update_bin_obj.name.encode("utf-8")
lib = get_lib_api()
lib.CreatePackage(
pointer(head_list), component_list, save_patch,
OPTIONS_MANAGER.private_key.encode("utf-8"))
if OPTIONS_MANAGER.private_key == ON_SERVER:
offset = 0
signing_package(update_bin_obj.name,
OPTIONS_MANAGER.hash_algorithm, position=offset)
UPDATE_LOGGER.print_log(".bin package signing success!")
UPDATE_LOGGER.print_log(
"Create update package .bin complete! path: %s" % update_bin_obj.name)
return update_bin_obj, lib
def get_component_list(all_image_file_obj_list, component_dict):
"""
Get the list of component information according to
the component information structure.
:param all_image_file_obj_list: all image object file list
:param component_dict: Component information content dict
:return component_list: List of component information.
If exception occurs, return False.
"""
pkg_components = PkgComponent * len(component_dict)
component_list = pkg_components()
if OPTIONS_MANAGER.partition_file_obj is not None:
extend_component_list = \
EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
OPTIONS_MANAGER.board_list_file_path,
OPTIONS_MANAGER.partition_file_obj.name]
else:
extend_component_list = EXTEND_COMPONENT_LIST
extend_path_list = [OPTIONS_MANAGER.version_mbn_file_path,
OPTIONS_MANAGER.board_list_file_path]
idx = 0
for key, component in component_dict.items():
if idx < len(extend_component_list):
file_path = extend_path_list[idx]
else:
file_path = \
all_image_file_obj_list[idx - len(extend_component_list)].name
digest = get_hash_content(file_path, OPTIONS_MANAGER.hash_algorithm)
if digest is None:
return
if component is None:
component = copy.copy(COMPONENT_INFO_INNIT)
component[0] = key
component_list[idx].digest = (c_ubyte * 32).from_buffer_copy(
digest.encode('utf-8'))
component_list[idx].file_path = file_path.encode("utf-8")
component_list[idx].component_addr = \
('/%s' % component[0]).encode("utf-8")
component_list[idx].version = component[4].encode("utf-8")
component_list[idx].size = 0
component_list[idx].transfer_id = int(component[1])
component_list[idx].original_size = os.path.getsize(file_path)
component_list[idx].res_type = int(component[2])
component_list[idx].type_str = int(component[3])
component_list[idx].flags = IS_DEL
idx += 1
return component_list
def get_head_list(component_count, head_value_list):
"""
According to the header structure, get the list of HEAD headers.
:param component_count: number of components
:param head_value_list: list of header values
:return head_list: header list
"""
head_list = PkgHeader()
head_list.digest_method = 2
if OPTIONS_MANAGER.signing_algorithm == "ECC":
# signing algorithm use ECC
head_list.sign_method = SignMethod.ECC.value
else:
# signing algorithm use RSA
head_list.sign_method = SignMethod.RSA.value
head_list.pkg_type = 1
head_list.entry_count = component_count
head_list.update_file_version = int(head_value_list[0])
head_list.product_update_id = head_value_list[1].encode("utf-8")
head_list.software_version = head_value_list[2].encode("utf-8")
head_list.date = head_value_list[3].encode("utf-8")
head_list.time = head_value_list[4].encode("utf-8")
return head_list
def get_tools_component_list(count, opera_script_dict):
"""
Get the list of component information according to
the component information structure.
:param count: number of components
:param opera_script_dict: script file name and path dict
:return component_list: list of component information.
If exception occurs, return False.
"""
pkg_components = PkgComponent * count
component_list = pkg_components()
component_value_list = list(opera_script_dict.keys())
component_num = 0
for i, component in enumerate(component_value_list):
component_list[i].file_path = component.encode("utf-8")
component_list[i].component_addr = \
(opera_script_dict[component]).encode("utf-8")
component_num += 1
return component_list, component_num
def get_tools_head_list(component_count):
"""
According to the header structure, get the list of HEAD headers.
:param component_count: number of components
:return head_list: header list
"""
head_list = PkgHeader()
head_list.digest_method = 0
head_list.sign_method = 0
head_list.pkg_type = 2
head_list.entry_count = component_count
return head_list
def get_signing_from_server(package_path, hash_algorithm):
"""
Server update package signature requires the vendor to
implement its own service signature interface, as shown below:
ip = ""
user_name = ""
pass_word = ""
signe_jar = ""
signing_config = [signe_jar, ip, user_name, pass_word,
package_path, hash_algorithm]
cmd = ' '.join(signing_config)
subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
:param package_path: update package file path
:param hash_algorithm: hash algorithm
:return:
"""
UPDATE_LOGGER.print_log("Signing %s, hash algorithm is: %s" %
(package_path, hash_algorithm))
signing_content = ""
return signing_content
def signing_package(package_path, hash_algorithm,
position=0, package_type='.bin'):
"""
Update package signature.
:param package_path: update package file path
:param hash_algorithm: hash algorithm
:param position: signature write location
:param package_type: the type of package.bin/.zip
:return:
"""
try:
signing_content = get_signing_from_server(
package_path, hash_algorithm)
if position != 0:
with open(package_path, mode='rb+') as f_r:
f_r.seek(position)
f_r.write(signing_content.encode())
else:
with open(package_path, mode='ab') as f_w:
f_w.write(signing_content.encode())
return True
except (OSError, TypeError):
UPDATE_LOGGER.print_log("%s package signing failed!" % package_type)
raise OSError
def create_build_tools_zip(lib):
"""
Create the update package file.
:param lib: lib object
:return:
"""
opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
tmp_dict = {}
for each in SCRIPT_KEY_LIST:
tmp_dict[each] = []
if opera_script_file_name_dict == tmp_dict:
UPDATE_LOGGER.print_log(
"Script dict is null!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
count = 0
opera_script_dict = {}
for each_value in opera_script_file_name_dict.values():
for each in each_value:
opera_script_dict[each[1].name] = each[0]
count += 1
head_list = get_tools_head_list(count + 2)
component_list, num = \
get_tools_component_list(count + 2, opera_script_dict)
total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
update_exe_path = os.path.join(OPTIONS_MANAGER.target_package_dir,
UPDATE_EXE_FILE_NAME)
if not os.path.exists(update_exe_path):
UPDATE_LOGGER.print_log(
"updater_binary file does not exist!path: %s" % update_exe_path,
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
file_obj = tempfile.NamedTemporaryFile(prefix="build_tools-")
file_save_patch = file_obj.name.encode("utf-8")
component_list[num].file_path = update_exe_path.encode("utf-8")
component_list[num].component_addr = \
UPDATE_EXE_FILE_NAME.encode("utf-8")
component_list[num + 1].file_path = \
total_script_file_obj.name.encode("utf-8")
component_list[num + 1].component_addr = \
TOTAL_SCRIPT_FILE_NAME.encode("utf-8")
lib.CreatePackage(
pointer(head_list), component_list, file_save_patch,
OPTIONS_MANAGER.private_key.encode("utf-8"))
return file_obj
def build_update_package(no_zip, update_package, prelude_script,
verse_script, refrain_script, ending_script):
"""
Create the update package file.
:param no_zip: no zip
:param update_package: update package path
:param prelude_script: prelude object
:param verse_script: verse object
:param refrain_script: refrain object
:param ending_script: ending object
:return: If exception occurs, return False.
"""
update_bin_obj, lib = create_update_bin()
OPTIONS_MANAGER.update_bin_obj = update_bin_obj
update_file_name = ''.join(
[OPTIONS_MANAGER.product, '_ota_',
time.strftime("%H%M%S", time.localtime())])
if not no_zip:
update_package_path = os.path.join(
update_package, '%s.zip' % update_file_name)
OPTIONS_MANAGER.update_package_file_path = update_package_path
create_script(prelude_script, verse_script,
refrain_script, ending_script)
build_tools_zip_obj = create_build_tools_zip(lib)
if build_tools_zip_obj is False:
UPDATE_LOGGER.print_log(
"Create build tools zip failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
OPTIONS_MANAGER.build_tools_zip_obj = build_tools_zip_obj
head_list = PkgHeader()
head_list.digest_method = 2
if OPTIONS_MANAGER.signing_algorithm == "ECC":
# signing algorithm use ECC
head_list.sign_method = SignMethod.ECC.value
else:
# signing algorithm use RSA
head_list.sign_method = SignMethod.RSA.value
head_list.pkg_type = 2
head_list.entry_count = 2
pkg_components = PkgComponent * 2
component_list = pkg_components()
component_list[0].file_path = \
OPTIONS_MANAGER.update_bin_obj.name.encode("utf-8")
component_list[0].component_addr = 'update.bin'.encode("utf-8")
component_list[1].file_path = \
OPTIONS_MANAGER.build_tools_zip_obj.name.encode("utf-8")
component_list[1].component_addr = \
BUILD_TOOLS_FILE_NAME.encode("utf-8")
lib.CreatePackage(
pointer(head_list), component_list,
update_package_path.encode("utf-8"),
OPTIONS_MANAGER.private_key.encode("utf-8"))
if OPTIONS_MANAGER.private_key == ON_SERVER:
signing_package(update_bin_obj.name,
OPTIONS_MANAGER.hash_algorithm,
package_type='.zip')
UPDATE_LOGGER.print_log(".zip package signing success!")
UPDATE_LOGGER.print_log(
"Create update package .bin complete! path: %s" %
update_package_path)
else:
update_package_path = os.path.join(
update_package, '%s.bin' % update_file_name)
OPTIONS_MANAGER.update_package_file_path = update_package_path
with open(OPTIONS_MANAGER.update_bin_obj.name, 'rb') as r_f:
content = r_f.read()
with open(update_package_path, 'wb') as w_f:
w_f.write(content)
return True
def get_hash_content(file_path, hash_algorithm):
"""
Use SHA256SUM to get the hash value of the file.
:param file_path : file path
:param hash_algorithm: hash algorithm
:return hash_content: hash value
"""
try:
cmd = [LINUX_HASH_ALGORITHM_DICT[hash_algorithm], file_path]
except KeyError:
UPDATE_LOGGER.print_log(
"Unsupported hash algorithm! %s" % hash_algorithm,
log_type=UPDATE_LOGGER.ERROR_LOG)
return None
if not os.path.exists(file_path):
UPDATE_LOGGER.print_log(
"%s failed!" % LINUX_HASH_ALGORITHM_DICT[hash_algorithm],
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
process_obj = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
hash_content = ''
while process_obj.poll() is None:
line = process_obj.stdout.readline()
line = line.strip()
if line:
hash_content = line.decode(encoding='gbk').split(' ')[0]
if process_obj.returncode == 0:
UPDATE_LOGGER.print_log(
"Get hash content success! path: %s" % file_path)
return hash_content

573
utils.py Normal file
View File

@ -0,0 +1,573 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Description : Defining constants, common interface
"""
import json
import os
import shutil
import tempfile
import zipfile
from collections import OrderedDict
import xmltodict
from copy import copy
from ctypes import cdll
from cryptography.hazmat.primitives import hashes
from log_exception import UPDATE_LOGGER
operation_path = os.getcwd()
PRODUCT = 'hi3516'
BUILD_TOOLS_FILE_NAME = 'build_tools.zip'
UPDATE_EXE_FILE_NAME = "updater_binary"
SCRIPT_KEY_LIST = ['prelude', 'verse', 'refrain', 'ending']
TOTAL_SCRIPT_FILE_NAME = "loadScript.us"
SCRIPT_FILE_NAME = '-script.us'
UPDATER_CONFIG = "updater_config"
XML_FILE_PATH = "updater_specified_config.xml"
SO_PATH = os.path.join(operation_path, 'lib/libpackage.so')
DIFF_EXE_PATH = os.path.join(operation_path, 'lib/diff')
MISC_INFO_PATH = "misc_info.txt"
VERSION_MBN_PATH = "VERSION.mbn"
BOARD_LIST_PATH = "BOARD.list"
EXTEND_COMPONENT_LIST = ["version_list", "board_list"]
EXTEND_OPTIONAL_COMPONENT_LIST = ["partitions_file"]
PARTITION_FILE = "partitions_file"
IGNORED_PARTITION_LIST = ['fastboot', 'boot', 'kernel', 'misc',
'updater', 'userdata']
SPARSE_IMAGE_MAGIC = 0xED26FF3A
# The related data is the original data of blocks set by chunk_size.
CHUNK_TYPE_RAW = 0xCAC1
# The related data is 4-byte fill data.
CHUNK_TYPE_FILL = 0xCAC2
# The related data is empty.
CHUNK_TYPE_DONT_CARE = 0xCAC3
# CRC32 block
CHUNK_TYPE_CRC32 = 0xCAC4
HASH_ALGORITHM_DICT = {'sha256': hashes.SHA256, 'sha384': hashes.SHA384}
LINUX_HASH_ALGORITHM_DICT = {'sha256': 'sha256sum', 'sha384': 'sha384sum'}
COMPONENT_INFO_INNIT = ['', '000', '00', '0', '0o00']
ON_SERVER = "ON_SERVER"
# The length of the header information of the sparse image is 28 bytes.
HEADER_INFO_FORMAT = "<I4H4I"
HEADER_INFO_LEN = 28
# The length of the chunk information of the sparse image is 12 bytes.
CHUNK_INFO_FORMAT = "<2H2I"
CHUNK_INFO_LEN = 12
EXTEND_VALUE = 512
FILE_MAP_ZERO_KEY = "__ZERO"
FILE_MAP_NONZERO_KEY = "__NONZERO"
FILE_MAP_COPY_KEY = "__COPY"
MAX_BLOCKS_PER_GROUP = BLOCK_LIMIT = 1024
PER_BLOCK_SIZE = 4096
TWO_STEP = "updater"
def singleton(cls):
_instance = {}
def _singleton(*args, **kargs):
if cls not in _instance:
_instance[cls] = cls(*args, **kargs)
return _instance[cls]
return _singleton
@singleton
class OptionsManager:
"""
Options management class
"""
def __init__(self):
# Own parameters
self.product = None
# Entry parameters
self.source_package = None
self.target_package = None
self.update_package = None
self.no_zip = False
self.partition_file = None
self.signing_algorithm = None
self.hash_algorithm = None
self.private_key = None
self.make_dir_path = None
# Parsed package parameters
self.target_package_dir = None
self.target_package_config_dir = None
self.target_package_temp_obj = None
self.misc_info_dict = {}
self.version_mbn_file_path = None
self.version_mbn_content = None
self.board_list_file_path = None
self.board_list_content = None
self.source_package_dir = None
self.source_package_temp_obj = None
# XML parsing parameters
self.head_info_list = []
self.component_info_dict = {}
self.full_img_list = []
self.incremental_img_list = []
self.target_package_version = None
self.source_package_version = None
self.two_step = False
self.partition_file_obj = None
# Full processing parameters
self.full_image_content_len_list = []
self.full_image_file_obj_list = []
# Incremental processing parameters
self.incremental_content_len_list = []
self.incremental_image_file_obj_list = []
self.incremental_temp_file_obj_list = []
# Script parameters
self.opera_script_file_name_dict = {}
for each in SCRIPT_KEY_LIST:
self.opera_script_file_name_dict[each] = []
self.total_script_file_obj = None
# Update package parameters
self.update_bin_obj = None
self.build_tools_zip_obj = None
self.update_package_file_path = None
OPTIONS_MANAGER = OptionsManager()
def unzip_package(package_path, origin='target'):
"""
Decompress the zip package.
:param package_path: zip package path
:param origin: package origin, which indicates
whether the zip package is a source package or target package
:return: Temporary directory (tmp_dir) and zip_data package;
false if an exception occurs.
"""
try:
tmp_dir_obj = tempfile.TemporaryDirectory(prefix="%sfiles-" % origin)
tmp_dir = tmp_dir_obj.name
zf_obj = zipfile.ZipFile(package_path)
for name in zf_obj.namelist():
if name.endswith('/'):
os.mkdir(os.path.join(tmp_dir, name))
else:
ext_filename = os.path.join(
tmp_dir, name)
with open(ext_filename, 'wb') as f_w:
f_w.write(zf_obj.read(name))
except OSError:
UPDATE_LOGGER.print_log(
"Unzip package failed! path: %s" % package_path,
log_type=UPDATE_LOGGER.ERROR_LOG)
return False, False
tmp_dir_list = os.listdir(tmp_dir)
if len(tmp_dir_list) == 1:
unzip_dir = os.path.join(tmp_dir, tmp_dir_list[0])
if UPDATER_CONFIG not in \
os.listdir(unzip_dir):
UPDATE_LOGGER.print_log(
'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG)
return False, False
elif UPDATER_CONFIG in tmp_dir_list:
unzip_dir = tmp_dir
else:
UPDATE_LOGGER.print_log(
'Unsupported zip package structure!', UPDATE_LOGGER.ERROR_LOG)
return False, False
UPDATE_LOGGER.print_log(
'%s package unzip complete! path: %s' % (origin.title(), unzip_dir))
return tmp_dir_obj, unzip_dir
def parse_update_config(xml_path):
"""
Parse the XML configuration file.
:param xml_path: XML configuration file path
:return head_info_dict: header information dict of the update package
component_info_dict: component information dict
full_img_list: full image list
incremental_img_list: incremental image list
"""
two_step = False
if os.path.exists(xml_path):
with open(xml_path, 'r') as xml_file:
xml_str = xml_file.read()
else:
UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" %
xml_path, UPDATE_LOGGER.ERROR_LOG)
return False, False, False, False, False, False
xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
package_dict = xml_content_dict.get('package', {})
head_dict = package_dict.get('head', {}).get('info')
package_version = head_dict.get("@softVersion")
# component
component_info = package_dict.get('group', {}).get('component')
head_list = list(head_dict.values())
head_list.pop()
whole_list = []
difference_list = []
component_dict = {}
expand_component(component_dict)
if isinstance(component_info, OrderedDict):
component_info = [component_info]
if component_info is None:
return [], {}, [], [], '', False
for component in component_info:
component_list = list(component.values())
component_list.pop()
component_dict[component['@compAddr']] = component_list
if component['@compAddr'] in (whole_list + difference_list):
UPDATE_LOGGER.print_log("This component %s repeats!" %
component['@compAddr'],
UPDATE_LOGGER.ERROR_LOG)
return False, False, False, False, False, False
if component['@compType'] == '0':
whole_list.append(component['@compAddr'])
elif component['@compType'] == '1':
difference_list.append(component['@compAddr'])
if component['@compAddr'] == TWO_STEP:
two_step = True
UPDATE_LOGGER.print_log('XML file parsing completed!')
return head_list, component_dict, \
whole_list, difference_list, package_version, two_step
def partitions_conversion(data):
"""
Convert the start or length data in the partition table through
multiply 1024 * 1024 and return the data.
:param data: start or length data
:return :
"""
if data == '0':
return 0
elif data.endswith('M'):
return int(data[0:-1]) * 1024 * 1024 // 512
else:
return False
def parse_partition_file_xml(xml_path):
"""
Parse the XML configuration file.
:param xml_path: XML configuration file path
:return part_json: partition table information in JSON format
"""
if os.path.exists(xml_path):
with open(xml_path, 'r') as xml_file:
xml_str = xml_file.read()
else:
UPDATE_LOGGER.print_log("XML file does not exist! xml path: %s" %
xml_path, UPDATE_LOGGER.ERROR_LOG)
return False, False
partitions_list = []
xml_content_dict = xmltodict.parse(xml_str, encoding='utf-8')
part_list = xml_content_dict['Partition_Info']['Part']
new_part_list = []
for i, part in enumerate(part_list):
start_value = partitions_conversion(part.get('@Start'))
length_value = partitions_conversion(part.get('@Length'))
if start_value is False or length_value is False:
UPDATE_LOGGER.print_log(
"Partition file parsing failed! part_name: %s, xml_path: %s" %
(part.get('@PartitionName'), xml_path),
UPDATE_LOGGER.ERROR_LOG)
return False, False
if part.get('@PartitionName') not in IGNORED_PARTITION_LIST:
partitions_list.append(part.get('@PartitionName'))
part_dict = {'start': start_value,
'length': length_value,
'partName': part.get('@PartitionName'),
'fsType': part.get('@FlashType')}
new_part_list.append(part_dict)
part_json = json.dumps(new_part_list)
part_json = '{"Partition": %s}' % part_json
file_obj = tempfile.NamedTemporaryFile(prefix="partition_file-", mode='wb')
file_obj.write(part_json.encode())
file_obj.seek(0)
return file_obj, partitions_list
def expand_component(component_dict):
"""
Append components such as VERSION.mbn and board list.
:param component_dict: component information dict
:return:
"""
if OPTIONS_MANAGER.partition_file is not None:
extend_component_list = \
EXTEND_COMPONENT_LIST + EXTEND_OPTIONAL_COMPONENT_LIST
else:
extend_component_list = EXTEND_COMPONENT_LIST
for each in extend_component_list:
tmp_info_list = copy(COMPONENT_INFO_INNIT)
tmp_info_list[0] = each
component_dict[each] = tmp_info_list
def clear_options():
"""
Clear OPTIONS_MANAGER.
"""
OPTIONS_MANAGER.product = None
# Entry parameters
OPTIONS_MANAGER.source_package = None
OPTIONS_MANAGER.target_package = None
OPTIONS_MANAGER.update_package = None
OPTIONS_MANAGER.no_zip = False
OPTIONS_MANAGER.partition_file = None
OPTIONS_MANAGER.signing_algorithm = None
OPTIONS_MANAGER.hash_algorithm = None
OPTIONS_MANAGER.private_key = None
OPTIONS_MANAGER.make_dir_path = None
# Parsed package parameters
OPTIONS_MANAGER.target_package_dir = None
OPTIONS_MANAGER.target_package_config_dir = None
OPTIONS_MANAGER.target_package_temp_obj = None
OPTIONS_MANAGER.misc_info_dict = {}
OPTIONS_MANAGER.version_mbn_file_path = None
OPTIONS_MANAGER.version_mbn_content = None
OPTIONS_MANAGER.board_list_file_path = None
OPTIONS_MANAGER.board_list_content = None
OPTIONS_MANAGER.source_package_dir = None
OPTIONS_MANAGER.source_package_temp_obj = None
# XML parsing parameters
OPTIONS_MANAGER.head_info_list = []
OPTIONS_MANAGER.component_info_dict = {}
OPTIONS_MANAGER.full_img_list = []
OPTIONS_MANAGER.incremental_img_list = []
OPTIONS_MANAGER.target_package_version = None
OPTIONS_MANAGER.source_package_version = None
OPTIONS_MANAGER.partition_file_obj = None
# Global processing parameters
OPTIONS_MANAGER.full_image_content_len_list = []
OPTIONS_MANAGER.full_image_file_obj_list = []
# Incremental processing parameters
OPTIONS_MANAGER.incremental_content_len_list = []
OPTIONS_MANAGER.incremental_image_file_obj_list = []
OPTIONS_MANAGER.incremental_temp_file_obj_list = []
# Script parameters
OPTIONS_MANAGER.opera_script_file_name_dict = {}
for each in SCRIPT_KEY_LIST:
OPTIONS_MANAGER.opera_script_file_name_dict[each] = []
OPTIONS_MANAGER.total_script_file_obj = None
# Update package parameters
OPTIONS_MANAGER.update_bin_obj = None
OPTIONS_MANAGER.build_tools_zip_obj = None
OPTIONS_MANAGER.update_package_file_path = None
def clear_resource(err_clear=False):
"""
Clear resources, close temporary files, and clear temporary paths.
:param err_clear: whether to clear errors
:return:
"""
target_package_temp_obj = OPTIONS_MANAGER.target_package_temp_obj
if target_package_temp_obj is not None:
target_package_temp_obj.cleanup()
source_package_temp_obj = OPTIONS_MANAGER.source_package_temp_obj
if source_package_temp_obj is not None:
source_package_temp_obj.cleanup()
partition_file_obj = OPTIONS_MANAGER.partition_file_obj
if partition_file_obj is not None:
partition_file_obj.close()
build_tools_zip_obj = OPTIONS_MANAGER.build_tools_zip_obj
if build_tools_zip_obj is not None:
build_tools_zip_obj.close()
update_bin_obj = OPTIONS_MANAGER.update_bin_obj
if update_bin_obj is not None:
update_bin_obj.close()
total_script_file_obj = OPTIONS_MANAGER.total_script_file_obj
if total_script_file_obj is not None:
total_script_file_obj.close()
full_image_file_obj_list = OPTIONS_MANAGER.full_image_file_obj_list
if len(full_image_file_obj_list) != 0:
for each_full_obj in full_image_file_obj_list:
each_full_obj.close()
clear_file_obj(err_clear)
clear_options()
def clear_file_obj(err_clear):
"""
Clear resources and temporary file objects.
:param err_clear: whether to clear errors
:return:
"""
incremental_temp_file_obj_list = \
OPTIONS_MANAGER.incremental_temp_file_obj_list
if len(incremental_temp_file_obj_list) != 0:
for each_incremental_temp_obj in incremental_temp_file_obj_list:
if each_incremental_temp_obj is not None:
each_incremental_temp_obj.close()
incremental_image_file_obj_list = \
OPTIONS_MANAGER.incremental_image_file_obj_list
if len(incremental_image_file_obj_list) != 0:
for each_incremental_obj in incremental_image_file_obj_list:
if each_incremental_obj is not None:
each_incremental_obj.close()
opera_script_file_name_dict = OPTIONS_MANAGER.opera_script_file_name_dict
for each_value in opera_script_file_name_dict.values():
for each in each_value:
each[1].close()
if err_clear:
make_dir_path = OPTIONS_MANAGER.make_dir_path
if make_dir_path is not None and os.path.exists(make_dir_path):
shutil.rmtree(make_dir_path)
update_package_file_path = OPTIONS_MANAGER.update_package_file_path
if update_package_file_path is not None and \
os.path.exists(update_package_file_path):
os.remove(update_package_file_path)
UPDATE_LOGGER.print_log(
'Exception occurred, Resource cleaning completed!')
else:
UPDATE_LOGGER.print_log('Resource cleaning completed!')
def get_file_content(file_path, file_name=None):
"""
Read the file content.
:param file_path: file path
:param file_name: file name
:return: file content
"""
if not os.path.exists(file_path):
UPDATE_LOGGER.print_log(
"%s is not exist! path: %s" % (file_name, file_path),
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
with open(file_path, 'r') as r_f:
file_content = r_f.read()
UPDATE_LOGGER.print_log(
"%s file parsing complete! path: %s" % (file_name, file_path))
return file_content
def get_update_info():
"""
Parse the configuration file to obtain the update information.
:return: update information if any; false otherwise.
"""
OPTIONS_MANAGER.version_mbn_file_path = os.path.join(
OPTIONS_MANAGER.target_package_config_dir, VERSION_MBN_PATH)
version_mbn_content = \
get_file_content(
OPTIONS_MANAGER.version_mbn_file_path, os.path.basename(
os.path.join(OPTIONS_MANAGER.target_package_config_dir,
VERSION_MBN_PATH)))
if version_mbn_content is False:
UPDATE_LOGGER.print_log(
"Get version mbn content failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
OPTIONS_MANAGER.version_mbn_content = version_mbn_content
OPTIONS_MANAGER.board_list_file_path = os.path.join(
OPTIONS_MANAGER.target_package_config_dir, BOARD_LIST_PATH)
board_list_content = \
get_file_content(
OPTIONS_MANAGER.board_list_file_path, os.path.basename(
os.path.join(OPTIONS_MANAGER.target_package_config_dir,
BOARD_LIST_PATH)))
if board_list_content is False:
UPDATE_LOGGER.print_log(
"Get board list content failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
OPTIONS_MANAGER.board_list_content = board_list_content
# Parse the XML configuration file.
head_info_list, component_info_dict, \
full_img_list, incremental_img_list, \
OPTIONS_MANAGER.target_package_version, \
OPTIONS_MANAGER.two_step = \
parse_update_config(
os.path.join(OPTIONS_MANAGER.target_package_config_dir,
XML_FILE_PATH))
if head_info_list is False or component_info_dict is False or \
full_img_list is False or incremental_img_list is False:
UPDATE_LOGGER.print_log(
"Get parse update config xml failed!",
log_type=UPDATE_LOGGER.ERROR_LOG)
return False
OPTIONS_MANAGER.head_info_list, OPTIONS_MANAGER.component_info_dict, \
OPTIONS_MANAGER.full_img_list, OPTIONS_MANAGER.incremental_img_list = \
head_info_list, component_info_dict, \
full_img_list, incremental_img_list
return True
def get_lib_api(input_path=None):
"""
Get the so API.
:param input_path: file path
:return:
"""
if input_path is not None:
so_path = "%s/%s" % (input_path, SO_PATH)
else:
so_path = SO_PATH
if not os.path.exists(so_path):
UPDATE_LOGGER.print_log(
"So does not exist! so path: %s" % so_path,
UPDATE_LOGGER.ERROR_LOG)
raise RuntimeError
lib = cdll.LoadLibrary(so_path)
return lib

57
vendor_script.py Normal file
View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from script_generator import Script
from utils import SCRIPT_KEY_LIST
def create_vendor_script_class():
"""
Obtain the extended script objects of the vendor. By default,
the return value is [None] * len(SCRIPT_KEY_LIST).
SCRIPT_KEY_LIST is the stage list in Opera mode.
If needed, rewrite this function to create a Vendor{Opera}Script object
class and return the object. Sample code is as follows:
prelude_script = VendorPreludeScript()
verse_script = VendorVerseScript()
refrain_script = VendorRefrainScript()
ending_script = VendorEndingScript()
opera_obj_list = [prelude_script, verse_script,
refrain_script, ending_script]
:return opera_obj_list: {Opera}script object list
"""
opera_obj_list = [None] * len(SCRIPT_KEY_LIST)
return opera_obj_list
class VendorPreludeScript(Script):
def __init__(self):
super().__init__()
class VendorVerseScript(Script):
def __init__(self):
super().__init__()
class VendorRefrainScript(Script):
def __init__(self):
super().__init__()
class VendorEndingScript(Script):
def __init__(self):
super().__init__()