实现了rom/ram分析工具

Signed-off-by: aodongbiao <aodongbiao@huawei.com>
This commit is contained in:
aodongbiao 2023-01-28 17:13:56 +08:00
parent 5511c8d2d4
commit 7cc0e75cb8
27 changed files with 2136 additions and 4 deletions

View File

@ -0,0 +1,42 @@
# rom_ram_analyzer
## 目的
分析各部件的rom占用,结果以xls和json格式进行保存
## 支持产品
目标是支持所有的产品,但是目前由于配置文件没设计好,只支持:ipcamera_hispark_taurus ipcamera_hispark_taurus_linux wifiiot_hispark_pegasus
## 代码思路
1. 扫描BUILD.gn文件,收集各个target的编译产物及其对应的component_name, subsystem_name信息,并存储到config.yaml中的gn_info_file字段指定的json文件中
2. 根据配置文件config.yaml扫描产品的编译产物目录,得到真实的编译产物信息(主要是大小)
3. 用真实的编译产物与从BUILD.gn中收集的信息进行匹配,从而得到编译产物-大小-所属部件的对应信息
4. 如果匹配失败,会直接利用grep到项目路径下进行搜索,尝试利用出现次数最多的BUILD.gn文件进行部件归属
5. 如果还搜索失败,则将其归属到others
## 使用
1. 修改config.yaml
- project_path:oh项目根路径
- output_file:保存最终结果的文件的名字[optional]
2. `python3 rom_analysis.py --product_name {your_product_name} [--recollect_gn bool]`运行代码,其中recollect_gn表示是需要重新扫描BUILD.gn还是直接使用已有结果.eg: `python3 rom_analysis.py --product_name ipcamera_hispark_taurus`
3. 最终会产生4个json文件及一个xls文件,如果是默认配置,各文件描述如下:
- gn_info.json:BUILD.gn的分析结果
- sub_com_info.json:从bundle.json中进行分析获得的各部件及其对应根目录的信息
- {product_name}_product.json:该产品实际的编译产物信息,根据config.yaml进行收集
- {product_name}_result.json:各部件的rom大小分析结果
- {product_name}_result.xls:各部件的rom大小分析结果
## 新增template
主要是在config.py中配置Processor,并在config.yaml中添加相应内容
## 后续工作
1. 配置解耦,目前对config.yaml的解析分散在代码各处,不合理
2. 配置文件按产品优化
3. 部分log的输出有待优化
4. hap_pack需要对hap_name进行处理

View File

@ -0,0 +1,179 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools
import os
import re
import glob
import logging
from deprecated.sphinx import deprecated
from typing import *
# warnings.filterwarnings("always")
def do_nothing(x: Any) -> Any:
return x
class BasicTool:
VERSION = 1.0
@classmethod
def contains_keywords(cls, wrapper: Any, key_words: tuple) -> bool:
"""
判断target中是否包含key_words中的元素
:param wrapper: 可以使用 x in y 语法的y
:param key_words: 待进行判断的关键词
:return: 标识wrapper中是否包含key_words中元素的一个bool值
"""
for k in key_words:
if k in wrapper:
return True
return False
@classmethod
def is_empty_iter(cls, itera: Iterator) -> Tuple[Iterator, bool]:
"""
判断iterator是否为空因为会改变原来的iterator所以需要返回一个和原来iterator相同的iter
:param itera: 迭代器
:return: 包含原迭代器内容的迭代器原迭代器是否为空
"""
itera, peek = itertools.tee(itera)
empty_flag = False
try:
next(peek)
except StopIteration:
empty_flag = True
finally:
...
return itera, empty_flag
@classmethod
def find_files_with_pattern(cls, folder: str, pattern: str = "/**", recursive: bool = True, apply_abs: bool = True,
real_path: bool = True, de_duplicate: bool = True, is_sort: bool = True,
post_handler: Callable[[List[str]], List[str]] = None) -> list:
"""
根据指定paatern匹配folder下的所有文件默认递归地查找所有文件
folder要查找的目录,会先经过cls.abspath处理(结尾不带/)
pattern要查找的模式,需要以/开头,因为会和folder拼接
recursive是否递归查找
apply_abs是否将路径转换为绝对路径
real_path如果是软链接是否转换为真正的路径
de_duplicate是否去重
is_sort是否排序
post_handler: 对文件进行额外处理的方法参数为文件名的List返回值为字符串列表
FIXME 有可能会卡住,可能原因为符号链接
"""
file_list = glob.glob(cls.abspath(folder)+pattern, recursive=recursive)
if apply_abs:
file_list = list(map(lambda x: cls.abspath(x), file_list))
if real_path:
file_list = list(map(lambda x: os.path.realpath(x), file_list))
if de_duplicate:
file_list = list(set(file_list))
if is_sort:
file_list = sorted(file_list, key=str.lower)
if post_handler:
file_list = post_handler(file_list)
if folder in file_list:
file_list.remove(folder)
return file_list
@classmethod
def match_paragraph(cls, content: str, start_pattern: str = r"\w+\(\".*?\"\) *{", end_pattern: str = "\}") -> \
Iterator[re.Match]:
"""
匹配代码段支持单行
注意ptrn中已经包含前面的空格所以start_pattern中可以省略
:param content: 被匹配的字符串
:param start_pattern: 模式的开头
:param end_pattern: 模式的结尾
:return: 匹配到的段落的迭代器
"""
ptrn = r'^( *){s}(?#匹配开头).*?(?#中间非贪婪)\1(?#如果开头前面有空格,则结尾的前面应该有相同数量的空格)?{e}$(?#匹配结尾)'.format(
s=start_pattern, e=end_pattern)
ptrn = re.compile(ptrn, re.M | re.S)
result = re.finditer(ptrn, content)
return result
@classmethod
def re_group_1(cls, content: str, pattern: str, **kwargs) -> str:
"""
匹配正则表达式如果有匹配到内容返回group(1)的内容
:param content: 要被匹配的内容
:param pattern: 进行匹配的模式
:return: 匹配到的结果group(1)
TODO 的检查应该更严格
"""
if not (r'(' in pattern and r')' in pattern):
raise ValueError("parentheses'()' must in the pattern")
result = re.search(pattern, content, **kwargs)
if result:
return result.group(1)
return str()
@classmethod
def abspath(cls, path: str) -> str:
"""
将路径转换为绝对路径如果有~展开
:param path: 要被转换的路径
:return: 绝对路径
"""
return os.path.abspath(os.path.expanduser(path))
@classmethod
def grep_ern(cls, pattern: str, path: str, include: str = str(), exclude: tuple = tuple(),
post_handler: Callable[[Text], Any] = do_nothing) -> Any:
"""
执行grep命令来查找内容
:param exclude: 不搜索path下的的exclude目录
:param include: 指定要搜索的文件
:param pattern: 使用pattern进行grep
:param path: 搜索路径
:param post_handler: 对grep的结果进行后处理
:return: post_handler对grep结果进行处理后的结果
TODO 将cls.execute用subprocess代替
"""
cmd = f"grep -Ern '{pattern}' '{cls.abspath(path)}'"
if include:
cmd += f" --include='{include}'"
for e in exclude:
cmd += f" --exclude-dir={e}"
o = cls.execute(cmd)
if post_handler:
o = post_handler(o)
return o
@classmethod
def execute(cls, cmd: str, post_processor: Callable[[Text], Text] = do_nothing) -> Any:
"""
封装popen返回标准输出的列表
:param post_processor: 对执行结果进行处理
:param cmd: 待执行的命令
:return: 经处理过后的字符串列表
"""
output = os.popen(cmd).read()
output = post_processor(output)
return output
if __name__ == '__main__':
for i in BasicTool.grep_ern("^( *)ohos_shared_library", "/home/aodongbiao/oh", include="BUILD.gn", exclude=("out", "doc", ".ccache"), post_handler=lambda x: x.split('\n')):
if "oh/out" in i:
print(i)

View File

@ -0,0 +1,437 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import logging
import re
import ast
import json
import collections
from typing import *
if __name__ == '__main__':
from basic_tool import BasicTool
else:
from pkgs.basic_tool import BasicTool
class GnCommonTool:
"""
处理BUILD.gn文件的通用方法
"""
@classmethod
def _find_gn_variable_list(cls, content: str) -> List:
"""
获取s中${xxx}$xxx形式的gn变量
:param content: 待查找的字符串
:param sep: 分隔符使用本分隔符将内容进行分隔然后逐一查找
:return: 变量名及其符号eg${abc}$abc
:FIXME 没有对a = 'a' b = a中的b这种形式进行处理
"""
result = list()
splited = content.split(os.sep)
patern = re.compile(r"\${.*?}")
for item in splited:
m = re.findall(patern, item)
result.extend(m)
if len(m) == 0 and "$" in item:
item = item.strip('"')
result.append(item[item.index("$"):])
return result
@classmethod
def is_gn_variable(cls, target: str, quote_processed: bool = False):
"""
判断target是否是gn中的变量:
规则如果是有引号的模式则没有引号均认为是变量有引号的情况下如有是"$xxx"${xx}的模式则认为xxx是变量
如果是无引号模式则只要$开头就认为是变量
b = "xxx"
c = b
c = "${b}"
"$p"
:param target: 要进行判断的字符串对象
:param quote_processed: 引号是否已经去除
:return: target是否为gn中的变量
"""
target = target.strip()
if quote_processed:
return target.startswith("$")
if target.startswith('"') and target.endswith('"'):
target = target.strip('"')
if target.startswith("${") and target.endswith("}"):
return True
elif target.startswith("$"):
return True
return False
else:
return True
@classmethod
def contains_gn_variable(cls, s: str, quote_processed: bool = False):
"""
判断字符串s中是否包含gn变量
"""
return cls.is_gn_variable(s, quote_processed) or ("$" in s)
# 给__find_variables_in_gn用的减少io
__var_val_mem_dict = collections.defaultdict(str)
@classmethod
def find_variables_in_gn(cls, var_name_tuple: tuple, path: str, stop_tail: str = "home", use_cache: bool = False) -> \
List[str]:
"""
同时查找多个gn变量的值
var_name_tuple变量名的tuple变量名应是未经过处理后的
xxx
"${xxx}"
"$xxx"
:param var_name_tuple: 待查找的变量名的列表
:param path: 变量名所在文件的路径
:param stop_tail: 当path以stop_tail结尾时停止查找
:param use_cache: 是否使用缓存
:return: 变量值的列表
"""
if os.path.isfile(path):
path, _ = os.path.split(path)
var_val_dict = collections.defaultdict(str)
not_found_count = len(var_name_tuple)
if use_cache:
for var in var_name_tuple:
val = GnCommonTool.__var_val_mem_dict[var]
if val:
not_found_count -= 1
var_val_dict[var] = val
# while (not path.endswith(stop_tail)) and not_found_count:
while (stop_tail in path) and not_found_count:
for v in var_name_tuple:
pv = v.strip('"').lstrip("${").rstrip('}')
# 先直接grep出pv *= *\".*?\"的
# 然后排除含有$符的
# 再取第一个
# 最后只取引号内的
# backup:begin
cmd = fr"grep -Ern '{pv} *= *\".*?\"' --include=*.gn* {path} | grep -Ev '\$' " \
r"| head -n 1 | grep -E '\".*\"' -wo"
output = BasicTool.execute(cmd, lambda x: x.strip().strip('"'))
# backup:end
if not output:
continue
not_found_count -= 1
var_val_dict[v] = output
GnCommonTool.__var_val_mem_dict[v] = output
path, _ = os.path.split(path)
return list(var_val_dict.values())
@classmethod
def find_variables_in_gn_test(cls, var_name_tuple: tuple, path: str, stop_tail: str = "home", use_cache: bool = False) -> \
List[str]:
"""
同时查找多个gn变量的值
var_name_tuple变量名的tuple变量名应是未经过处理后的
xxx
"${xxx}"
"$xxx"
:param var_name_tuple: 待查找的变量名的列表
:param path: 变量名所在文件的路径
:param stop_tail: 当path以stop_tail结尾时停止查找
:param use_cache: 是否使用缓存
:return: 变量值的列表
"""
if os.path.isfile(path):
path, _ = os.path.split(path)
var_val_dict = collections.defaultdict(str)
not_found_count = len(var_name_tuple)
if use_cache:
for var in var_name_tuple:
val = GnCommonTool.__var_val_mem_dict[var]
if val:
not_found_count -= 1
var_val_dict[var] = val
flag = "${updater_faultloggerd_cfg}" in var_name_tuple[0]
while not path.endswith(stop_tail) and not_found_count:
for v in var_name_tuple:
pv = v.strip('"').lstrip("${").rstrip('}')
# 先直接grep出pv *= *\".*?\"的
# 然后排除含有$符的
# 再取第一个
# 最后只取引号内的
cmd = fr"grep -Ern '{pv} *=' --include=*.gn* {path}"
cr = BasicTool.execute(cmd)
if not cr:
break
vfile = cr.split('\n')[0].split(':')[0]
with open(vfile, 'r', encoding='utf-8') as f:
output =GnVariableParser.string_parser(pv, f.read())
if not output:
continue
not_found_count -= 1
var_val_dict[v] = output
GnCommonTool.__var_val_mem_dict[v] = output
path, _ = os.path.split(path)
return list(var_val_dict.values())
@classmethod
def find_variable_in_gn(cls, var_name: str, path: str, stop_tail: str = "home", use_cache: bool = False):
"""
查找变量的单个值
:param use_cache: 是否使用cache
:param stop_tail: 结束查找的目录
:param path: 开始查找的路径
:param var_name: 变量名
:return: 变量值任意候选值之一
"""
res = cls.find_variables_in_gn((var_name,), path, stop_tail, use_cache)
if res:
return res[0]
return ""
@classmethod
def replace_gn_variables(cls, s: str, gn_path: str, stop_tail: str) -> str:
"""
替换字符串中的gn变量名为其值,注意,没有对s是否真的包含gn变量进行验证
:param s: 待替换的字符串
:param gn_path: 字符串所在的gn文件
:param stop_tail: 当变量查找到stop_tail目录时停止
:return: 将变量替换为其值的字符串
"""
variable_list = GnCommonTool._find_gn_variable_list(s)
if len(variable_list) == 0:
return s
value_list = GnCommonTool.find_variables_in_gn(
tuple(variable_list), path=gn_path, stop_tail=stop_tail)
for k, v in dict(zip(variable_list, value_list)).items():
s = s.replace(k, v)
return s
@classmethod
def find_values_of_variable(cls, var_name: str, path: str, stop_tail: str = "home") -> list:
"""
查找变量的值如果有多个可能值全部返回
:param var_name: 变量名
:param path: 变量名所在的文件
:param stop_tail: 当变量查找到stop_tail目录时停止
:return: 该变量的可能值
"""
if os.path.isfile(path):
path, _ = os.path.split(path)
result = list()
v = var_name.strip('"').lstrip("${").rstrip('}')
while stop_tail in path:
cmd = fr"grep -Ern '^( *){v} *= *\".*?\"' --include=*.gn* {path}"
output = os.popen(cmd).readlines()
path = os.path.split(path)[0]
if not output:
continue
for line in output:
line = line.split('=')[-1].strip().strip('"')
if len(line) == 0:
continue
result.append(line)
break
return result
class SubsystemComponentNameFinder:
@classmethod
def __find_subsystem_component_from_bundle(cls, gn_path: str, stop_tail: str = "home") -> Tuple[str, str]:
"""
根据BUILD.gn的全路径一层层往上面查找bundle.json文件
并从bundle.json中查找component_name和subsystem
:param gn_path: gn文件的路径
:param stop_tail: 当查找到stop_tail的时候停止
:return: 子系统名称部件名
"""
filename = "bundle.json"
component_name = str()
subsystem_name = str()
if stop_tail not in gn_path:
return subsystem_name, component_name
if os.path.isfile(gn_path):
gn_path, _ = os.path.split(gn_path)
while not gn_path.endswith(stop_tail):
bundle_path = os.path.join(gn_path, filename)
if not os.path.isfile(bundle_path): # 如果该文件不在该目录下
gn_path = os.path.split(gn_path)[0]
continue
with open(bundle_path, 'r', encoding='utf-8') as f:
content = json.load(f)
try:
component_name = content["component"]["name"]
subsystem_name = content["component"]["subsystem"]
except KeyError:
logging.warning(
"not found component/name or component/subsystem in bundle.json")
finally:
break
return component_name, subsystem_name
@classmethod
def find_subsystem_component(cls, gn_file: str, project_path: str) -> Tuple[str, str]:
"""
查找gn_file对应的component_name和subsystem
如果在gn中找不到就到bundle.json中去找
:param gn_file: gn文件路径
:param project_path: 项目路径
:return: 子系统名部件名
"""
part_var_flag = False # 标识这个变量从gn中取出的原始值是不是变量
subsystem_var_flag = False
component_pattern = r"component_name *= *(.*)"
subsystem_pattern = r"subsystem_name *= *(.*)"
with open(gn_file, 'r', encoding='utf-8') as f:
content = f.read()
subsystem_name = BasicTool.re_group_1(
content, subsystem_pattern).strip()
component_name = BasicTool.re_group_1(
content, component_pattern).strip()
if len(component_name) != 0:
if GnCommonTool.is_gn_variable(component_name):
part_var_flag = True
else:
component_name = component_name.strip('"')
if len(subsystem_name) != 0: # 这里是只是看有没有grep到关键字
if GnCommonTool.is_gn_variable(subsystem_name):
subsystem_var_flag = True
else:
subsystem_name = subsystem_name.strip('"')
if part_var_flag or subsystem_var_flag:
s, c = GnCommonTool.find_variables_in_gn(
(subsystem_name, component_name), gn_file, project_path)
if part_var_flag:
component_name = c
if subsystem_var_flag:
subsystem_name = s
if len(subsystem_name) != 0 and len(component_name) != 0:
return subsystem_name, component_name
# 如果有一个没有找到就要一层层去找bundle.json文件
t_component_name, t_subsystem_name = cls.__find_subsystem_component_from_bundle(
gn_file, stop_tail=project_path)
if len(t_component_name) != 0:
component_name = t_component_name
if len(t_subsystem_name) != 0:
subsystem_name = t_subsystem_name
return component_name, subsystem_name
class GnVariableParser:
@classmethod
def string_parser(cls, var: str, content: str) -> str:
"""
解析值为字符串的变量,没有对引号进行去除
:param content: 要进行解析的内容
:param var: 变量名
:return: 变量值[str]
"""
# result = BasicTool.re_group_1(content, r"{} *= *(.*)".format(var))
result = BasicTool.re_group_1(
content, r"{} *= *[\n]?(\".*?\")".format(var), flags=re.S | re.M)
return result
@classmethod
def list_parser(cls, var: str, content: str) -> List[str]:
"""
解析值为列表的变量list的元素必须全为数字或字符串,且没有对引号进行去除
:param var: 变量名
:param content: 要进行
:return: 变量值[List]
"""
result = BasicTool.re_group_1(
content, r"{} *= *(\[.*?\])".format(var), flags=re.S | re.M)
result = ast.literal_eval(result.strip())
return result
if __name__ == '__main__':
cc = \
"""
target("shared_library", "mmp"){
xxx
}
ohos_shared_library("pinauthservice") {
sources = [
"//base/useriam/pin_auth/services/modules/driver/src/pin_auth_driver_hdi.cpp",
"//base/useriam/pin_auth/services/modules/driver/src/pin_auth_interface_adapter.cpp",
"//base/useriam/pin_auth/services/modules/executors/src/pin_auth_executor_callback_hdi.cpp",
"//base/useriam/pin_auth/services/modules/executors/src/pin_auth_executor_hdi.cpp",
"//base/useriam/pin_auth/services/modules/inputters/src/i_inputer_data_impl.cpp",
"//base/useriam/pin_auth/services/modules/inputters/src/pin_auth_manager.cpp",
"//base/useriam/pin_auth/services/sa/src/pin_auth_service.cpp",
]
configs = [
":pin_auth_services_config",
"//base/useriam/user_auth_framework/common:iam_log_config",
"//base/useriam/user_auth_framework/common:iam_utils_config",
]
deps = [
"//base/useriam/pin_auth/frameworks:pinauth_ipc",
"//base/useriam/user_auth_framework/common/executors:userauth_executors",
"//third_party/openssl:libcrypto_shared",
]
external_deps = [
"access_token:libaccesstoken_sdk",
"c_utils:utils",
"drivers_interface_pin_auth:libpin_auth_proxy_1.0",
"hisysevent_native:libhisysevent",
"hiviewdfx_hilog_native:libhilog",
"ipc:ipc_core",
"safwk:system_ability_fwk",
]
t = [
1,
2,
3
]
tt = [
aaa,
bbb,
ccc
]
remove_configs = [ "//build/config/compiler:no_exceptions" ]
subsystem_name = "useriam"
part_name = "pin_auth"
}"""
s = """
updater_usb_init_cfg_path = "//base/startup/init/services/etc/init.usb.cfg"
updater_init_usb_configfs_path_cfg =
"//drivers/peripheral/usb/cfg/init.usb.configfs.cfg"
updater_faultloggerd_cfg =
"//base/hiviewdfx/faultloggerd/services/config/faultloggerd.cfg"
updater_hilog_cfg = "//base/hiviewdfx/hilog/services/hilogd/etc/hilogd.cfg"
ohos_prebuilt_etc("updater_hilog.cfg") {
source = "${updater_hilog_cfg}"
install_images = [ "updater" ]
part_name = "updater"
}
"""
s = "\"${updater_faultloggerd_cfg}\""
print(GnCommonTool.contains_gn_variable(s))
# print(GnVariableParser.string_parser("updater_faultloggerd_cfg",s))
# print(re.search(
# "updater_faultloggerd_cfg *= *[\n]?(\".*?\")", s, flags=re.S | re.M).group())
# print(GnVariableParser.list_parser("t", cc))
# print(len(GnVariableParser.list_parscer("t", cc)))
# print(TargetNameParser.second_parser(cc))
# print(GnCommonTool._find_gn_variable_list("a/${b}${e}/$c"))
# print(GnCommonTool._find_gn_variable_list("abc_$abc"))
# print(GnCommonTool.find_values_of_variable(
# "\"${OHOS_PROFILER_SUBSYS_NAME}\"", path="/home/aodongbiao/oh/third_party/abseil-cpp/absl/strings/BUILD.gn", stop_tail="/home/aodongbiao/oh"))
...

View File

@ -0,0 +1,119 @@
import xlwt
from xlwt import Worksheet
import typing
import logging
from typing import Optional
from collections.abc import Iterable
class SimpleExcelWriter:
def __init__(self, default_sheet_name: str = "sheet1"):
self.__book = xlwt.Workbook(encoding='utf-8', style_compression=0)
self.__sheet_dict = {
default_sheet_name: self.__book.add_sheet(
sheetname=default_sheet_name, cell_overwrite_ok=True)
}
self.__sheet_pos = {
default_sheet_name: (0, 0) # 记录各个sheet页已经写到什么位置了当前值为还没有写的
}
self.__default_sheet_name = default_sheet_name
# 表头样式
self.__head_style = xlwt.XFStyle()
# 内容样式
self.__content_style = xlwt.XFStyle()
# 字体
font = xlwt.Font()
font.bold = True
# 设置背景颜色
pattern = xlwt.Pattern()
pattern.pattern = xlwt.Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = 22 # 背景颜色
# 居中对齐
alignment = xlwt.Alignment()
alignment.horz = xlwt.Alignment.HORZ_CENTER # 水平方向
alignment.vert = xlwt.Alignment.VERT_CENTER # 垂直方向
self.__head_style.font = font
self.__head_style.alignment = alignment
self.__head_style.pattern = pattern
self.__content_style.alignment = alignment
def __increment_y(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
y = y + value
self.__sheet_pos[sheet_name] = (x, y)
return y
def __increment_x(self, sheet_name: str, value: int = 1) -> int:
if sheet_name in self.__sheet_pos.keys():
x, y = self.__sheet_pos.get(sheet_name)
x = x + value
self.__sheet_pos[sheet_name] = (x, 0)
return x
def append_line(self, content: list, sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
logging.error("sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
x, y = self.__sheet_pos.get(sheet_name)
for ele in content:
sheet.write(x, y, ele, style=self.__content_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def write_merge(self, x0: int, y0: int, x1: int, y1: int, content: typing.Any,
sheet_name: str = None):
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
logging.error("sheet name '{}' not exist".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
sheet.write_merge(x0, x1, y0, y1, content, style=self.__content_style)
def set_sheet_header(self, headers: Iterable, sheet_name: str = None):
"""
给sheet页设置表头
"""
sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name
if sheet_name not in self.__sheet_dict.keys():
logging.error("sheet name '{}' not exist".format(sheet_name))
return
x, y = self.__sheet_pos.get(sheet_name)
if x != 0 or y != 0:
logging.error(
"pos of sheet '{}' is not (0,0). set_sheet_header must before write".format(sheet_name))
return
sheet: Worksheet = self.__sheet_dict.get(sheet_name)
for h in headers:
sheet.write(x, y, h, self.__head_style)
y = self.__increment_y(sheet_name)
self.__increment_x(sheet_name)
def add_sheet(self, sheet_name: str, cell_overwrite_ok=True) -> Optional[xlwt.Worksheet]:
if sheet_name in self.__sheet_dict.keys():
logging.error("sheet name '{}' has exist".format(sheet_name))
return
self.__sheet_dict[sheet_name] = self.__book.add_sheet(
sheetname=sheet_name, cell_overwrite_ok=cell_overwrite_ok)
self.__sheet_pos[sheet_name] = (0, 0)
return self.__sheet_dict.get(sheet_name)
def save(self, file_name: str):
self.__book.save(file_name)
if __name__ == '__main__':
writer = SimpleExcelWriter(default_sheet_name="first")
writer.add_sheet("second")
writer.add_sheet("third")
writer.set_sheet_header(["h", "m", "n"])
writer.append_line([1, 2, 3])
writer.append_line([2, 3, 4], "second")
writer.append_line([3, 4, 5], "third")
writer.append_line([3, 2, 1])
writer.save("demo.xls")

View File

@ -0,0 +1,15 @@
import yaml
from typing import *
from yaml.loader import SafeLoader
class SimpleYamlTool:
@classmethod
def read_yaml(cls, file_name: str, mode: str = "r", encoding: str = "utf-8") -> Dict:
with open(file_name, mode, encoding=encoding) as f:
return yaml.load(f, Loader=SafeLoader)
if __name__ == '__main__':
config = SimpleYamlTool.read_yaml("/home/aodongbiao/build_static_check/tools/component_tools/rom_ram_analyzer/src/config.yaml")
print(config["black_grep_dir"])

View File

@ -0,0 +1 @@
VERSION = 1.0

View File

@ -0,0 +1,193 @@
import os
import sys
import argparse
import json
from typing import *
# from gn_lineno_collector import gn_lineno_collect
import preprocess
from pkgs.simple_yaml_tool import SimpleYamlTool
from pkgs.basic_tool import do_nothing, BasicTool
from get_subsystem_component import SC
from post_handlers import SOPostHandler, APostHandler, DefaultPostHandler, HAPPostHandler, LiteLibPostHandler, LiteLibS2MPostHandler
from template_processor import BaseProcessor, DefaultProcessor, StrResourceProcessor, ListResourceProcessor, LiteComponentPostHandler
from target_name_parser import *
from info_handlers import extension_handler, hap_name_handler, target_type_handler
"""
只给rom_analysis.py使用
"""
# # global variables
configs = SimpleYamlTool.read_yaml("config.yaml")
result_dict: Dict[str, Any] = dict()
project_path = BasicTool.abspath(configs.get("project_path"))
_sc_json: Dict[Text, Text] = configs.get("subsystem_component_json")
_sc_save = _sc_json.get("save")
_target_type = configs["target_type"]
_sc_output_path = _sc_json.get("filename")
sub_com_dict: Dict = SC.run(project_path, _sc_output_path, _sc_save)
collector_config: Tuple[BaseProcessor] = (
DefaultProcessor(project_path=project_path, # 项目根路径
result_dict=result_dict, # 保存结果的字典
# targte的类型名称,即xxx("yyy")中的xxx
target_type=_target_type[0],
# 用以进行匹配的模式串,包括匹配段落时作为前缀
match_pattern=fr"^( *){_target_type[0]}\(.*?\)",
sub_com_dict=sub_com_dict, # 从bundle.json中收集的subsystem_name和component_name信息
target_name_parser=TargetNameParser.single_parser, # 进行target_name解析的parser
other_info_handlers={
"extension": extension_handler,
}, # 解析其他信息的parser,{"字段名":该字段的parser}
unit_post_handler=SOPostHandler() # 对即将进行存储的unit字典的handler,会返回一个str作为存储时的key
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[1],
match_pattern=fr"^( *){_target_type[1]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=SOPostHandler(),
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[2],
match_pattern=fr"^( *){_target_type[2]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=APostHandler(),
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[3],
match_pattern=fr"^( *){_target_type[3]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=APostHandler(),
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[4],
match_pattern=fr"^( *){_target_type[4]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[5],
match_pattern=fr"^( *){_target_type[5]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[6],
match_pattern=fr"^( *){_target_type[6]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"real_target_type": target_type_handler,
"extension": extension_handler,
},
unit_post_handler=LiteLibPostHandler(),
S2MPostHandler=LiteLibS2MPostHandler,
),
DefaultProcessor(project_path=project_path, # hap有个hap_name
result_dict=result_dict,
target_type=_target_type[7],
match_pattern=fr"^( *){_target_type[7]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"hap_name": hap_name_handler,
"extension": extension_handler,
},
unit_post_handler=HAPPostHandler(),
),
StrResourceProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[8],
match_pattern=fr"^( *){_target_type[8]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
resource_field="source"
),
StrResourceProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[9],
match_pattern=fr"^( *){_target_type[9]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
resource_field="source"
),
ListResourceProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[10],
match_pattern=fr"^( *){_target_type[10]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
resource_field="sources"
),
StrResourceProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[11],
match_pattern=fr"^( *){_target_type[11]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
# "extension": extension_handler,
},
unit_post_handler=DefaultPostHandler(),
resource_field="source"
),
DefaultProcessor(project_path=project_path,
result_dict=result_dict,
target_type=_target_type[12],
match_pattern=fr"^( *){_target_type[12]}\(.*?\)",
sub_com_dict=sub_com_dict,
target_name_parser=TargetNameParser.single_parser,
other_info_handlers={
"real_target_type": target_type_handler,
# "extension": extension_handler,
},
unit_post_handler=LiteComponentPostHandler(),
),
)
__all__ = ["configs", "result_dict", "collector_config", "sub_com_dict"]
if __name__ == '__main__':
for c in collector_config:
c.run()
with open("demo.json", 'w', encoding='utf-8') as f:
json.dump(result_dict, f)

View File

@ -0,0 +1,113 @@
# root:
project_path: ~/oh
# 从bundle.json中取出的component和subsystem_name的信息
# used by get_subsystem_component.py config.py
subsystem_component_json:
save: true
filename: sub_com_info.json
output_file: result
# gn info
gn_info_file: gn_info.json
# 注意:如果target_type有了更改,要相应改变config.py中collector_config
target_type:
- shared_library
- ohos_shared_library
- static_library
- ohos_static_library
- executable
- ohos_executable
- lite_library
- ohos_hap
- ohos_prebuilt_etc
- ohos_prebuilt_para
- ohos_sa_profile
- ohos_prebuilt_shared_library
- lite_component
# 要分析的编译产物的根目录及各类型对应的子目录
product_dir:
ipcamera_hispark_taurus:
root: out/hispark_taurus/ipcamera_hispark_taurus/rootfs
relative:
bin: bin
so: usr/lib
etc: etc
rest: True # 是否分析其他目录下的并归到etc
ipcamera_hispark_taurus_linux:
root: out/hispark_taurus/ipcamera_hispark_taurus_linux/rootfs
relative:
bin: bin
so: usr/lib
etc: etc
rest: True
wifiiot_hispark_pegasus:
root: out/hispark_pegasus/wifiiot_hispark_pegasus
relative:
a: libs
etc: etc
rest: False
# 各类型文件的匹配顺序
query_order:
ipcamera_hispark_taurus:
so:
- shared_library
- ohos_shared_library
- ohos_prebuilt_shared_library
- lite_library
- lite_component
a:
- static_library
- ohos_static_library
- lite_library
bin:
- executable
- ohos_executable
- lite_component
ipcamera_hispark_taurus_linux:
so:
- shared_library
- ohos_shared_library
- ohos_prebuilt_shared_library
- lite_library
- lite_component
a:
- static_library
- ohos_static_library
- lite_library
bin:
- executable
- ohos_executable
- lite_component
wifiiot_hispark_pegasus:
a:
- static_library
- ohos_static_library
- lite_library
# extension and prefix of products
default_extension:
shared_library: .so
static_library: .a
app: .hap
default_prefix:
shared_library: lib
static_library: lib
# black list for command 'grep'
black_list:
- .repo
- .ccache
- doc
- test
- build
- out

View File

@ -0,0 +1,131 @@
#! /usr/bin/python
import argparse
import os
import json
import logging
g_subsystem_path_error = list() # subsystem path exist in subsystem_config.json
# bundle.json path which cant get component path.
g_component_path_empty = list()
g_component_abs_path = list() # destPath can't be absolute path.
def get_subsystem_components(ohos_path: str):
subsystem_json_path = os.path.join(
ohos_path, r"build/subsystem_config.json")
subsystem_item = {}
with open(subsystem_json_path, 'rb') as f:
subsystem_json = json.load(f)
# get sunsystems
for i in subsystem_json:
subsystem_name = subsystem_json[i]["name"]
subsystem_path = os.path.join(ohos_path, subsystem_json[i]["path"])
if not os.path.exists(subsystem_path):
g_subsystem_path_error.append(subsystem_path)
continue
cmd = 'find ' + subsystem_path + ' -name bundle.json'
bundle_json_list = os.popen(cmd).readlines()
# get components
component_list = []
for j in bundle_json_list:
bundle_path = j.strip()
with open(bundle_path, 'rb') as bundle_file:
bundle_json = json.load(bundle_file)
component_item = {}
if 'segment' in bundle_json and 'destPath' in bundle_json["segment"]:
destpath = bundle_json["segment"]["destPath"]
component_item[bundle_json["component"]["name"]] = destpath
if os.path.isabs(destpath):
g_component_abs_path.append(destpath)
else:
component_item[bundle_json["component"]["name"]
] = "Unknow. Please check " + bundle_path
g_component_path_empty.append(bundle_path)
component_list.append(component_item)
subsystem_item[subsystem_name] = component_list
return subsystem_item
def get_subsystem_components_modified(ohos_root) -> dict:
ret = dict()
subsystem_info = get_subsystem_components(ohos_root)
if subsystem_info is None:
return None
for subsystem_k, subsystem_v in subsystem_info.items():
for component in subsystem_v:
for k, v in component.items():
ret.update({v: {'subsystem': subsystem_k, 'component': k}})
return ret
def export_to_json(subsystem_item: dict, output_filename: str):
subsystem_item_json = json.dumps(
subsystem_item, indent=4, separators=(', ', ': '))
with open(output_filename, 'w') as f:
f.write(subsystem_item_json)
logging.info("output path: " + output_filename)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("project", help="project root path.", type=str)
parser.add_argument("-o", "--outpath",
help="specify an output path.", type=str)
args = parser.parse_args()
ohos_path = os.path.abspath(args.project)
if not is_project(ohos_path):
logging.error("'" + ohos_path + "' is not a valid project path.")
exit(1)
output_path = r'.'
if args.outpath:
output_path = args.outpath
return ohos_path, output_path
def is_project(path: str) -> bool:
'''
@func: 判断是否源码工程
@note: 通过是否含有 .repo/manifests 目录粗略判断
'''
p = os.path.normpath(path)
return os.path.exists(p + '/.repo/manifests')
def print_warning_info():
'''@func: 打印一些异常信息。
'''
if g_component_path_empty or g_component_abs_path:
logging.warning("------------ warning info ------------------")
if g_component_path_empty:
logging.warning("can't find destPath in:")
logging.warning(g_component_path_empty)
if g_component_abs_path:
logging.warning("destPath can't be absolute path:")
logging.warning(g_component_abs_path)
class SC:
@classmethod
def run(cls, project_path: str, output_path: str = None, save_result: bool = True):
info = get_subsystem_components_modified(
os.path.abspath(os.path.expanduser(project_path)))
if save_result and output_path:
export_to_json(info, output_path)
print_warning_info()
return info
__all__ = ["SC"]
if __name__ == '__main__':
ohos_path, output_path = parse_args()
info = get_subsystem_components_modified(ohos_path)
export_to_json(info, output_path)
print_warning_info()

View File

@ -0,0 +1,38 @@
from typing import *
import os
from collections import defaultdict
import preprocess
from pkgs.basic_tool import BasicTool
from pkgs.simple_yaml_tool import SimpleYamlTool
config = SimpleYamlTool.read_yaml("config.yaml")
project_path = config.get("project_path")
black_list = map(lambda x: os.path.join(
project_path, x), config.get("black_list"))
def gn_lineno_collect(match_pattern: str, project_path: str) -> DefaultDict[str, List[int]]:
"""
在整个项目路径下搜索有特定target类型的BUILD.gn
:param match_pattern: 进行grep的pattern支持扩展的正则
:param project_path: 项目路径搜索路径
:return: {gn_file: [line_no_1, line_no_2, ..]}
"""
def handler(content: Text) -> List[str]:
return list(filter(lambda y: len(y) > 0, list(map(lambda x: x.strip(), content.split("\n")))))
grep_list = BasicTool.grep_ern(match_pattern, path=project_path, include="BUILD.gn", exclude=tuple(black_list),
post_handler=handler)
gn_line_dict: DefaultDict[str, List[int]] = defaultdict(list)
for gl in grep_list:
gn_file, line_no, _ = gl.split(":")
gn_line_dict[gn_file].append(line_no)
return gn_line_dict
if __name__ == '__main__':
res = gn_lineno_collect(
"^( *)ohos_shared_library\(.*?\)", BasicTool.abspath(project_path))
for k, v in res.items():
if "oh/out" in k:
print("file={}, line_no={}".format(k, v))

View File

@ -0,0 +1,19 @@
import logging
from typing import *
import preprocess
from pkgs.gn_common_tool import GnVariableParser
def extension_handler(paragraph: Text):
return GnVariableParser.string_parser("output_extension", paragraph).strip('"')
def hap_name_handler(paragraph: Text):
return GnVariableParser.string_parser("hap_name", paragraph).strip('"')
def target_type_handler(paragraph: Text):
tt = GnVariableParser.string_parser("target_type", paragraph).strip('"')
if not tt:
logging.warning("parse 'target_type' failed, maybe it's a variable")
return tt

View File

@ -0,0 +1,140 @@
from typing import *
from abc import ABC, abstractmethod
import copy
import logging
import preprocess
from pprint import pprint
from pkgs.simple_yaml_tool import SimpleYamlTool
_config = SimpleYamlTool.read_yaml("./config.yaml")
class BasePostHandler(ABC):
@abstractmethod
def run(self, unit: Dict[str, AnyStr]) -> str:
...
def __call__(self, unit: Dict[str, AnyStr]) -> str:
return self.run(unit)
class DefaultPostHandler(BasePostHandler):
def run(self, unit: Dict[str, AnyStr]):
return unit["output_name"]
class HAPPostHandler(BasePostHandler):
"""
for ohos_hap"""
def run(self, unit: Dict[str, AnyStr]):
extension = _config.get("default_extension").get("app")
gn_hap_name = unit.get("hap_name")
if gn_hap_name:
return gn_hap_name+extension
return unit["output_name"]+extension
class SOPostHandler(BasePostHandler):
"""
for shared_library"""
def run(self, unit: Dict[str, AnyStr]):
output_name = unit["output_name"]
prefix = _config.get("default_prefix").get("shared_library")
if unit.get("extension"):
extension = unit.get("extension")
else:
extension = _config.get("default_extension").get("shared_library")
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class APostHandler(BasePostHandler):
"""
for static library"""
def run(self, unit: Dict[str, AnyStr]):
output_name = unit["output_name"]
prefix = _config.get("default_prefix").get("static_library")
extension = _config.get("default_extension").get("static_library")
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class LiteLibPostHandler(BasePostHandler):
"""
for lite_library"""
def run(self, unit: Dict[str, AnyStr]):
tp = unit["real_target_type"]
output_name = unit["output_name"]
if tp == "static_library":
prefix = _config.get("default_prefix").get("static_library")
extension = _config.get("default_extension").get("static_library")
elif tp == "shared_library":
prefix = _config.get("default_prefix").get("shared_library")
extension = _config.get("default_extension").get("shared_library")
else:
prefix = str()
extension = str()
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class LiteComponentPostHandler(BasePostHandler):
"""
for lite_component"""
def run(self, unit: Dict[str, AnyStr]):
tp = unit["real_target_type"]
output_name = unit["output_name"]
extension = unit.get("output_extension")
if tp == "shared_library":
prefix = _config.get("default_prefix").get("shared_library")
extension = _config.get("default_extension").get("shared_library")
else:
if tp != "executable":
unit["description"] = "virtual node"
prefix = str()
extension = str()
return prefix+output_name+extension
"""
==========================分割线===========================
"""
def LiteLibS2MPostHandler(unit:Dict, result_dict:Dict)->None:
rt = unit.get("real_target_type")
new_unit = copy.deepcopy(unit)
if rt == "shared_library":
new_unit["real_target_type"] = "static_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
elif rt == "static_library":
new_unit["real_target_type"] = "shared_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
else:
logging.warning(f"target type should be 'shared_library' or 'static_library', but got '{rt}'")
new_unit["real_target_type"] = "shared_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
new_new_unit = copy.deepcopy(unit)
new_new_unit["real_target_type"] = "static_library"
k = LiteLibPostHandler()(new_new_unit)
new_new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_new_unit
if __name__ == '__main__':
h = SOPostHandler()
pseudo_d = {"output_name": "libmmp"}
print(h(pseudo_d))

View File

@ -0,0 +1,4 @@
import sys
import os
# 将上级目录加入到库的搜索路径
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

View File

@ -0,0 +1,284 @@
import sys
import argparse
import json
import logging
import os
from typing import *
import copy
from pprint import pprint
import preprocess
from time import time
from concurrent.futures import ThreadPoolExecutor, Future
from threading import RLock
import collections
from gn_lineno_collector import gn_lineno_collect
from config import result_dict, collector_config, configs, project_path, sub_com_dict
# from gn_info_collect import GnInfoCollector
from pkgs.basic_tool import BasicTool
from pkgs.gn_common_tool import GnCommonTool
from pkgs.simple_excel_writer import SimpleExcelWriter
"""
1. 先收集BUILD.gn中的target信息
2. 然后根据编译产物到1中进行搜索,匹配其所属的部件
对于ohos开头的template,主要根据其component字段和subsystem_name字段来归数其部件同时也要考虑install_dir字段
对于gn原生的template,主要根据bundle.json中的字段来归属其部件
对于找不到的,可以模糊匹配,,有产物libxxx,则可以在所有的BUILD.gn中搜索xxx,并设置一个阀值予以过滤
"""
def parse_args():
parser = argparse.ArgumentParser(
description="analysis rom size of L0 and L1 product")
parser.add_argument("-p", "--product_name", type=str, default="ipcamera_hispark_taurus_linux",
help="product name. eg: -p ipcamera_hispark_taurus")
parser.add_argument("-r", "--recollect_gn", type=bool,
default=True, help="if recollect gn info or not")
args = parser.parse_args()
return args
class RomAnalysisTool:
@classmethod
def collect_gn_info(cls):
with ThreadPoolExecutor(max_workers=len(collector_config) + 1) as pool:
future_list: List[Future] = list()
for c in collector_config:
future_list.append(pool.submit(c))
for f in future_list:
f.result()
gn_info_file = configs["gn_info_file"]
with open(gn_info_file, 'w', encoding='utf-8') as f:
json.dump(result_dict, f, indent=4)
@classmethod
def __find_files(cls, product_name: str) -> Dict[str, List[str]]:
product_dir: Dict[str, Dict] = configs["product_dir"].get(
f"{product_name}")
if not product_name:
logging.error(
f"product_name '{product_name}' not found in the config.yaml")
exit(1)
product_path_dit: Dict[str, str] = dict() # 存储编译产物的类型及目录
root_dir = product_dir.get("root")
root_dir = os.path.join(project_path, root_dir)
relative_dir: Dict[str, str] = product_dir.get("relative")
if not relative_dir:
logging.warning(
f"'{relative_dir}' of {product_name} not found in the config.yaml")
exit(1)
# 除了so a hap bin外的全部归到etc里面
for k, v in relative_dir.items():
product_path_dit[k] = os.path.join(root_dir, v)
# 查找编译产物信息
# product_dict格式: {"so": ["a.so", "b.so"]}
product_dict: Dict[str, List[str]] = dict() # 存储编译产物的名称
for k, v in product_path_dit.items():
if not os.path.exists(v):
logging.warning(f"dir '{v}' not exist")
product_dict[k] = BasicTool.find_files_with_pattern(v) # v是全路径
if product_dir.get("rest"):
rest_dir_list: List[str] = os.listdir(
root_dir) # 除了配置在relative下之外的所有剩余目录,全部归到etc下
for v in relative_dir.values():
# FIXME 对于配置文件中relative包含/的,如a/b/c,需要进一步特殊处理
if '/' in v:
v = os.path.split(v)[0]
if v in rest_dir_list:
rest_dir_list.remove(v)
else:
logging.warning(
f"config error: {v} not found in {product_dir}")
if "etc" not in product_dict.keys():
product_dict["etc"] = list()
for r in rest_dir_list:
product_dict["etc"].extend(
BasicTool.find_files_with_pattern(os.path.join(root_dir, r)))
return product_dict
@classmethod
def collect_product_info(cls, product_name: str):
product_dict: Dict[str, List[str]] = cls.__find_files(product_name)
with open(f"{product_name}_product.json", 'w', encoding='utf-8') as f:
json.dump(product_dict, f, indent=4)
return product_dict
@classmethod
def _put(cls, sub: str, com: str, unit: Dict, rom_size_dict: Dict):
size = unit.get("size")
if not rom_size_dict.get("size"): # 总大小
rom_size_dict["size"] = 0
if not rom_size_dict.get(sub): # 子系统大小
rom_size_dict[sub]: Dict[str, Dict] = dict()
rom_size_dict[sub]["size"] = 0
rom_size_dict[sub]["count"] = 0
if not rom_size_dict.get(sub).get(com): # 部件
rom_size_dict.get(sub)[com] = dict()
rom_size_dict[sub][com]["filelist"] = list()
rom_size_dict[sub][com]["size"] = 0
rom_size_dict[sub][com]["count"] = 0
rom_size_dict[sub][com]["filelist"].append(unit)
rom_size_dict[sub][com]["size"] += size
rom_size_dict[sub][com]["count"] += 1
rom_size_dict[sub]["size"] += size
rom_size_dict[sub]["count"] += 1
rom_size_dict["size"] += size
@classmethod
def _fuzzy_match(cls, file_name: str) -> Tuple[str, str, str]:
_, base_name = os.path.split(file_name)
if base_name.startswith("lib"):
base_name = base_name[3:]
if base_name.endswith(".a"):
base_name = base_name[:base_name.index(".a")]
if base_name.endswith(".z.so"):
base_name = base_name[:base_name.index(".z.so")]
elif base_name.endswith(".so"):
base_name = base_name[:base_name.index(".so")]
exclude_dir = [os.path.join(project_path, x)
for x in configs["black_list"]]
exclude_dir.append("test")
grep_result: List[str] = BasicTool.grep_ern(base_name, project_path, include="BUILD.gn", exclude=tuple(exclude_dir
), post_handler=lambda x: list(filter(lambda x: len(x) > 0, x.split('\n'))))
if not grep_result:
return str(), str(), str()
gn_dict: Dict[str, int] = collections.defaultdict(int)
for g in grep_result:
gn = g.split(':')[0].replace(project_path, "").lstrip(os.sep)
gn_dict[gn] += 1
gn_file, _ = collections.Counter(gn_dict).most_common(1)[0]
for k, v in sub_com_dict.items():
if gn_file.startswith(k):
return gn_file, v.get("subsystem"), v.get("component")
return str(), str(), str()
@classmethod
def save_as_xls(cls, result_dict: Dict, product_name: str) -> None:
header = ["subsystem_name", "component_name",
"output_file", "size(Byte)"]
tmp_dict = copy.deepcopy(result_dict)
excel_writer = SimpleExcelWriter("rom")
excel_writer.set_sheet_header(headers=header)
subsystem_start_row = 1
subsystem_end_row = 0
subsystem_col = 0
component_start_row = 1
component_end_row = 0
component_col = 1
del tmp_dict["size"]
for subsystem_name in tmp_dict.keys():
subsystem_dict = tmp_dict.get(subsystem_name)
subsystem_size = subsystem_dict.get("size")
subsystem_file_count = subsystem_dict.get("count")
del subsystem_dict["count"]
del subsystem_dict["size"]
subsystem_end_row += subsystem_file_count
for component_name in subsystem_dict.keys():
component_dict: Dict[str, int] = subsystem_dict.get(
component_name)
component_size = component_dict.get("size")
component_file_count = component_dict.get("count")
del component_dict["count"]
del component_dict["size"]
component_end_row += component_file_count
for fileinfo in component_dict.get("filelist"):
file_name = fileinfo.get("file_name")
file_size = fileinfo.get("size")
excel_writer.append_line(
[subsystem_name, component_name, file_name, file_size])
excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col,
component_name)
component_start_row = component_end_row + 1
excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col,
subsystem_name)
subsystem_start_row = subsystem_end_row + 1
output_name = configs["output_file"]
ot, base_name = os.path.split(output_name)
ol = list(ot)
ol.append(product_name + "_" + base_name+".xls")
output_name = os.path.join(*ol)
excel_writer.save(output_name)
@ classmethod
def analysis(cls, product_name: str, product_dict: Dict[str, List[str]]):
gn_info_file = configs["gn_info_file"]
with open(gn_info_file, 'r', encoding='utf-8') as f:
gn_info = json.load(f)
query_order: Dict[str, List[str]
] = configs["query_order"][product_name]
query_order["etc"] = configs["target_type"]
rom_size_dict: Dict = dict()
# prodcut_dict: {"a":["a.txt", ...]}
for t, l in product_dict.items():
for f in l: # 遍历所有文件
# query_order: {"a":[static_library", ...]}
find_flag = False
type_list = query_order.get(t)
_, base_name = os.path.split(f)
size = os.path.getsize(f)
if not type_list:
logging.warning(
f"'{t}' not found in query_order of the config.yaml")
continue
for tn in type_list: # tn example: ohos_shared_library
output_dict: Dict[str, Dict] = gn_info.get(tn)
if not output_dict:
logging.warning(
f"'{tn}' not found in the {gn_info_file}")
continue
d = output_dict.get(base_name)
if not d:
continue
d["size"] = size
d["file_name"] = f.replace(project_path, "")
cls._put(d["subsystem_name"],
d["component_name"], d, rom_size_dict)
find_flag = True
if not find_flag:
# fuzzy match
psesudo_gn, sub, com = cls._fuzzy_match(f)
if sub and com:
cls._put(sub, com, {
"subsystem_name": sub,
"component_name": com,
"psesudo_gn_path": psesudo_gn,
"description": "fuzzy match",
"file_name": f.replace(project_path, ""),
"size": size,
}, rom_size_dict)
find_flag = True
if not find_flag:
cls._put("others", "others", {
"file_name": f.replace(project_path, ""),
"size": size,
}, rom_size_dict)
ot, base_output_filename = os.path.split(configs["output_file"])
ol = list(ot)
ol.append(product_name + "_"+base_output_filename+".json")
output_file = os.path.join(*ol)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(rom_size_dict, f, indent=4)
cls.save_as_xls(rom_size_dict, product_name)
def main():
args = parse_args()
product_name = args.product_name
re_collect_gn_info = args.recollect_gn
if re_collect_gn_info:
RomAnalysisTool.collect_gn_info()
product_dict: Dict[str, List[str]
] = RomAnalysisTool.collect_product_info(product_name)
RomAnalysisTool.analysis(product_name, product_dict)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,94 @@
import os
import logging
import json
import re
from typing import *
from pkgs.gn_common_tool import GnCommonTool
class SubsystemComponentNameFinder:
@classmethod
def _find_subsystem_component_from_bundle(cls, gn_path: str, stop_tail: str = "home") -> Tuple[str, str]:
"""
根据BUILD.gn的全路径一层层往上面查找bundle.json文件
并从bundle.json中查找component_name和subsystem
"""
filename = "bundle.json"
subsystem_name = str()
component_name = str()
if stop_tail not in gn_path:
logging.error("{} not in {}".format(stop_tail, gn_path))
return subsystem_name, component_name
if os.path.isfile(gn_path):
gn_path, _ = os.path.split(gn_path)
while not gn_path.endswith(stop_tail):
bundle_path = os.path.join(gn_path, filename)
if not os.path.isfile(bundle_path): # 如果该文件不在该目录下
gn_path, _ = os.path.split(gn_path)
continue
with open(bundle_path, 'r', encoding='utf-8') as f:
content = json.load(f)
try:
component_name = content["component"]["name"]
subsystem_name = content["component"]["subsystem"]
except KeyError:
logging.warning(
"not found component/name or component/subsystem in bundle.json")
finally:
break
return subsystem_name, component_name
@classmethod
def _parse_subsystem_component(cls, content: str) -> Tuple[Text, Text]:
"""
从字符串中提取subsystem_name和component_name字段
"""
subsystem_name = str()
component_name = str()
subsystem = re.search(r"subsystem_name *=\s*(\S*)", content)
part = re.search(r"component_name *=\s*(\S*)", content)
if subsystem:
subsystem_name = subsystem.group(1)
if part:
component_name = part.group(1)
return subsystem_name, component_name
@classmethod
def find_part_subsystem(cls, gn_file: str, project_path: str) -> Tuple[Text, Text]:
"""
查找gn_file对应的component_name和subsystem
如果在gn中找不到就到bundle.json中去找
FIXME 一个gn文件中的target不一定属于同一个component,比如hap包
"""
part_var_flag = False # 标识这个变量从gn中取出的原始值是不是变量
subsystem_var_flag = False
var_list = list()
with open(gn_file, 'r', encoding='utf-8') as f:
subsystem_name, component_name = cls._parse_subsystem_component(f.read())
if len(component_name) != 0 and GnCommonTool.is_gn_variable(component_name):
part_var_flag = True
var_list.append(component_name)
if len(subsystem_name) != 0 and GnCommonTool.is_gn_variable(subsystem_name):
subsystem_var_flag = True
var_list.append(subsystem_name)
if part_var_flag and subsystem_var_flag:
component_name, subsystem_name = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)
elif part_var_flag:
component_name = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
elif subsystem_var_flag:
subsystem_name = GnCommonTool.find_variables_in_gn(
tuple(var_list), gn_file, project_path)[0]
if len(component_name) != 0 and len(subsystem_name) != 0:
return component_name, subsystem_name
# 如果有一个没有找到就要一层层去找bundle.json文件
t_component_name, t_subsystem_name = cls._find_subsystem_component_from_bundle(
gn_file, stop_tail=project_path)
if len(component_name) == 0:
component_name = t_component_name
if len(subsystem_name) == 0:
subsystem_name = t_subsystem_name
return component_name, subsystem_name

View File

@ -0,0 +1,25 @@
import sys
from typing import *
import preprocess
from pkgs.basic_tool import BasicTool
class TargetNameParser:
@classmethod
def single_parser(cls, paragraph: Text) -> str:
"""
查找类似shared_library("xxx")这种括号内只有一个参数的target的名称
:param paragraph: 要解析的段落
:return: target名称如果是变量不会对其进行解析
"""
return BasicTool.re_group_1(paragraph, r"\w+\((.*)\)")
@classmethod
def second_parser(cls, paragraph: Text) -> str:
"""
查找类似target("shared_library","xxx")这种的target名称括号内第二个参数
:param paragraph: 要解析的段落
:return: target名称如果是变量不会的其进行解析
"""
return BasicTool.re_group_1(paragraph, r"\w+\(.*?, *(.*?)\)")

View File

@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from threading import RLock
from typing import *
from abc import ABC, abstractmethod
from collections import defaultdict
from pprint import pprint
import os
import logging
from gn_lineno_collector import gn_lineno_collect
from pkgs.basic_tool import do_nothing, BasicTool
from pkgs.gn_common_tool import GnCommonTool, SubsystemComponentNameFinder, GnVariableParser
from post_handlers import *
TYPE = Literal["str", "list"]
class BaseProcessor(ABC):
"""
extend and usage:
DerivedClass(BaseProcessor):
def call():
# your implementation
DerivedClass(project_path,result_dict,...)
DerivedClass()
"""
def __init__(self,
project_path: str,
result_dict: Dict[str, Dict[str, Dict]],
target_type: str,
match_pattern: str,
sub_com_dict: Dict[str, Dict[str, str]],
target_name_parser: Callable[[Text], Text] = do_nothing,
other_info_handlers: Dict[str, Callable[[
Text], Union[str, list]]] = dict(),
unit_post_handler: BasePostHandler = do_nothing,
resource_field: str = None,
S2MPostHandler: Callable[[Dict, Dict], None] = None
):
"""
:param project_path: 项目根路径
:param result_dict: 存储结果的字典
:param target_type: target类型eg"shared_library"
:param match_pattern: 用于进行的模式egr"^( *)shared_library\(.*?\)"
:param sub_com_dict: 从get_subsystem_component.py运行结果加载进来的dict包含oh整体上的子系统部件及其路径信息
:param target_name_parser: 解析target名字的Callable
:param other_info_handlers: 对其他信息进行收集处理eg{"sources": SourcesParser}表示要处理target段落中的sources属性
SourceParser是对target段落进行分析处理的Callable接受一个字符串作为参数
:param unit_post_handler: 对最终要存储的结果字典进行后处理应当返回一个字符串作为存储时的key且该key应为预期产物去除前后缀后的名字
:resource_field: 针对资源类target,资源字段,如files = ["a.txt","b.txt"],则field为files
:S2MPostHandler: 将一个target保存为多个多个的处理器
"""
if target_type not in result_dict.keys():
result_dict[target_type] = dict()
self.project_path = project_path
self.result_dict = result_dict
self.target_type = target_type
self.match_pattern = match_pattern
self.gn_file_line_no_dict = gn_lineno_collect(
self.match_pattern, self.project_path)
self.sc_dict = sub_com_dict
self.target_name_parser = target_name_parser
self.other_info_handlers = other_info_handlers
self.unit_post_handler = unit_post_handler
self.resource_field = resource_field
self.S2MPostHandler = S2MPostHandler
def _append(self, key: str, unit: Dict) -> None:
"""
将target的结果存储到最终的结果字典中
:param key进行存储的key应为预期的文件名
:param unit: 要存储的target
:return: None
"""
self.result_dict.get(self.target_type)[key] = unit
def _find_sc(self, gn_path: str):
# gn_path与project_path都应当是绝对路径
if not gn_path.startswith(self.project_path):
logging.error("gn_path and project_path is not consistent: gn_path={}, project_path={}".format(
gn_path, self.project_path))
return "", ""
k = gn_path.replace(self.project_path, "").lstrip(os.sep)
for k, v in self.sc_dict.items():
if k.startswith(k):
return v.get("subsystem"), v.get("component")
return "", ""
@abstractmethod
def run(self):
...
def __call__(self, *args, **kwargs):
self.run()
class DefaultProcessor(BaseProcessor):
def helper(self, target_name: str, paragraph: str, gn_path: str, line_no: int, _sub: str, _com: str) -> Tuple[str]:
output_name = GnVariableParser.string_parser("output_name", paragraph)
if output_name.strip('"'):
if GnCommonTool.contains_gn_variable(output_name):
output_name = GnCommonTool.replace_gn_variables(
output_name, gn_path, self.project_path).strip('"')
out_from = "output_name"
else:
output_name = output_name.strip('"')
out_from = "target_name"
else:
output_name = target_name
out_from = "target_name"
sub = GnVariableParser.string_parser("subsystem_name", paragraph)
com = GnVariableParser.string_parser("part_name", paragraph)
if sub.strip('"'):
if GnCommonTool.contains_gn_variable(sub):
sub = GnCommonTool.replace_gn_variables(
sub, gn_path, self.project_path).strip('"')
else:
sub = sub.strip('"')
sub_from = "gn"
else:
sub = _sub
sub_from = "json"
if com.strip('"'):
if GnCommonTool.contains_gn_variable(com):
com = GnCommonTool.replace_gn_variables(
com, gn_path, self.project_path).strip('"')
else:
com = com.strip('"')
com_from = "gn"
else:
com = _com
com_from = "json"
result = {
"gn_path": gn_path,
"target_type": self.target_type,
"line_no": line_no,
"subsystem_name": sub,
"component_name": com,
"subsystem_from": sub_from,
"component_from": com_from,
"target_name": target_name,
"output_name": output_name,
"output_from": out_from,
}
for k, h in self.other_info_handlers.items():
result[k] = h(paragraph)
key = self.unit_post_handler(result)
self._append(key, result)
if self.S2MPostHandler:
self.S2MPostHandler(result, self.result_dict)
def run(self):
for gn_path, line_no_list in self.gn_file_line_no_dict.items():
# 该路径下的主要的subsystem_name与component_name如果target中没有指定则取此值如果指定了则以target中的为准
_sub, _com = self._find_sc(gn_path)
with open(gn_path, 'r', encoding='utf-8') as f:
content = f.read()
itr = BasicTool.match_paragraph(
content, start_pattern=self.target_type)
for line_no, p in zip(line_no_list, itr):
paragraph = p.group()
target_name = self.target_name_parser(paragraph).strip('"')
if not target_name:
continue
if GnCommonTool.contains_gn_variable(target_name, quote_processed=True):
possible_name_list = GnCommonTool.find_values_of_variable(target_name, path=gn_path,
stop_tail=self.project_path)
for n in possible_name_list:
self.helper(n, paragraph, gn_path,
line_no, _sub, _com)
else:
self.helper(target_name, paragraph,
gn_path, line_no, _sub, _com)
class StrResourceProcessor(DefaultProcessor):
def helper(self, target_name: str, paragraph: str, gn_path: str, line_no: int, _sub: str, _com: str) -> Tuple[str]:
resources = GnVariableParser.string_parser(
self.resource_field, paragraph)
if not resources.strip('"'):
return
if GnCommonTool.contains_gn_variable(resources):
resources = GnCommonTool.replace_gn_variables(
resources, gn_path, self.project_path).strip('"')
# FIXME 如果出现换行导致的在replace_gn_variables里面没有查找到变量的对应值,则直接取target_name作为resources
if GnCommonTool.contains_gn_variable(resources):
resources = target_name
else:
resources = resources.strip('"')
sub = GnVariableParser.string_parser("subsystem_name", paragraph)
com = GnVariableParser.string_parser("part_name", paragraph)
if sub:
if GnCommonTool.contains_gn_variable(sub):
sub = GnCommonTool.replace_gn_variables(
sub, gn_path, self.project_path).strip('"')
else:
sub = sub.strip('"')
sub_from = "gn"
else:
sub = _sub
sub_from = "json"
if com:
if GnCommonTool.contains_gn_variable(com):
com = GnCommonTool.replace_gn_variables(
com, gn_path, self.project_path).strip('"')
else:
com = com.strip('"')
com_from = "gn"
else:
com = _com
com_from = "json"
_, file_name = os.path.split(resources)
result = {
"gn_path": gn_path,
"target_type": self.target_type,
"line_no": line_no,
"subsystem_name": sub,
"component_name": com,
"subsystem_from": sub_from,
"component_from": com_from,
"target_name": target_name,
"output_name": file_name,
"output_from": "file_name",
}
for k, h in self.other_info_handlers.items():
result[k] = h(paragraph)
key = self.unit_post_handler(result)
self._append(key, result)
class ListResourceProcessor(DefaultProcessor):
def helper(self, target_name: str, paragraph: str, gn_path: str, line_no: int, _sub: str, _com: str) -> Tuple[str]:
resources = GnVariableParser.list_parser(
self.resource_field, paragraph)
if not resources:
return
sub = GnVariableParser.string_parser("subsystem_name", paragraph)
com = GnVariableParser.string_parser("part_name", paragraph)
if sub:
if GnCommonTool.contains_gn_variable(sub):
sub = GnCommonTool.replace_gn_variables(
sub, gn_path, self.project_path).strip('"')
else:
sub = sub.strip('"')
sub_from = "gn"
else:
sub = _sub
sub_from = "json"
if com:
if GnCommonTool.contains_gn_variable(com):
com = GnCommonTool.replace_gn_variables(
com, gn_path, self.project_path).strip('"')
else:
com = com.strip('"')
com_from = "gn"
else:
com = _com
com_from = "json"
for ff in resources:
_, file_name = os.path.split(ff)
result = {
"gn_path": gn_path,
"target_type": self.target_type,
"line_no": line_no,
"subsystem_name": sub,
"component_name": com,
"subsystem_from": sub_from,
"component_from": com_from,
"target_name": target_name,
"output_name": file_name,
"output_from": "file_name",
}
for k, h in self.other_info_handlers.items():
result[k] = h(paragraph)
key = self.unit_post_handler(result)
self._append(key, result)
if __name__ == '__main__':
...

View File

@ -85,6 +85,7 @@
1. 获取整个rom_ram_analyzer目录
2. hdc可用
2. 设备已连接
3. 系统已烧录
3. python3.8及以后
4. 安装requirements
```txt

View File

View File

@ -9,7 +9,7 @@ import subprocess
import typing
import xml.dom.minidom as dom
from packages.simple_excel_writer import SimpleExcelWriter
from pkgs.simple_excel_writer import SimpleExcelWriter
debug = True if sys.gettrace() else False

View File

@ -6,9 +6,9 @@ import typing
from copy import deepcopy
from typing import *
from packages.basic_tool import BasicTool
from packages.gn_common_tool import GnCommonTool
from packages.simple_excel_writer import SimpleExcelWriter
from pkgs.basic_tool import BasicTool
from pkgs.gn_common_tool import GnCommonTool
from pkgs.simple_excel_writer import SimpleExcelWriter
debug = bool(sys.gettrace())