移除typing.Literal,从而使ram_analyzer.py支持3.8以下python

Signed-off-by: aodongbiao <aodongbiao@huawei.com>
This commit is contained in:
aodongbiao 2023-02-08 16:47:23 +08:00
parent ed62bc536b
commit 8d29309604
8 changed files with 256 additions and 30 deletions

View File

@ -44,5 +44,4 @@
## 后续工作
1. 部分log的输出有待优化
1. hap_pack需要对hap_name进行处理
1. 部分log的输出有待优化

View File

@ -22,8 +22,8 @@ def parse_args():
help="product name. eg: -p ipcamera_hispark_taurus")
parser.add_argument("-o", "--oh_path", type=str,
default=".", help="root path of openharmony")
parser.add_argument("-r", "--recollect_gn", type=bool,
default=True, help="if recollect gn info or not")
parser.add_argument("-g", "--recollect_gn", action="store_false", help="recollect gn info or not")
parser.add_argument("-s", "--recollect_sc", action="store_false", help="recollect subsystem_component info or not")
args = parser.parse_args()
return args
@ -37,11 +37,16 @@ result_dict: Dict[str, Any] = dict()
project_path = BasicTool.abspath(_args.oh_path)
product_name = _args.product_name
recollect_gn = _args.recollect_gn
_recollect_sc = _args.recollect_sc
_sc_json: Dict[Text, Text] = configs.get("subsystem_component")
_sc_save = _sc_json.get("save")
_target_type = configs["target_type"]
_sc_output_path = _sc_json.get("filename")
sub_com_dict: Dict = SC.run(project_path, _sc_output_path, _sc_save)
if _recollect_sc:
sub_com_dict: Dict = SC.run(project_path, _sc_output_path, _sc_save)
else:
with open(_sc_output_path, 'r', encoding='utf-8') as f:
sub_com_dict = json.load(f)
collector_config: Tuple[BaseProcessor] = (
DefaultProcessor(project_path=project_path, # 项目根路径
@ -123,7 +128,7 @@ collector_config: Tuple[BaseProcessor] = (
"extension": extension_handler,
},
unit_post_handler=LiteLibPostHandler(),
S2MPostHandler=LiteLibS2MPostHandler,
ud_post_handler=LiteLibS2MPostHandler,
),
DefaultProcessor(project_path=project_path, # hap有个hap_name
result_dict=result_dict,

View File

@ -27,6 +27,7 @@ target_type:
- ohos_sa_profile
- ohos_prebuilt_shared_library
- lite_component
subsystem_component:
save: true
@ -49,11 +50,13 @@ ipcamera_hispark_taurus:
- ohos_shared_library
- ohos_prebuilt_shared_library
- lite_library
- lite_component
- lite_component
bin:
- executable
- ohos_executable
- lite_component
ipcamera_hispark_taurus_linux:
product_infofile: ipcamera_hispark_taurus_linux_product.json
@ -72,10 +75,12 @@ ipcamera_hispark_taurus_linux:
- ohos_prebuilt_shared_library
- lite_library
- lite_component
bin:
- executable
- ohos_executable
- lite_component
wifiiot_hispark_pegasus:
product_infofile: wifiiot_hispark_pegasus_product.json
@ -91,6 +96,7 @@ wifiiot_hispark_pegasus:
- static_library
- ohos_static_library
- lite_library
rk3568: # rk的目前从packages/phone/system_module_info.json中分析准确度更高,因为rk基本都使用的是ohos_xxx,而L0和L1的更多的是使用的gn原生target template
product_infofile: rk3568_product.json
@ -110,10 +116,12 @@ rk3568: # rk的目前从packages/phone/system_module_info.json中分析准确度
- ohos_prebuilt_shared_library
- lite_library
- lite_component
bin:
- ohos_executable
- executable
- lite_component
hap:
- ohos_hap

View File

@ -0,0 +1,228 @@
import logging
import copy
import os
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import *
import preprocess
from pkgs.gn_common_tool import GnVariableParser
from pkgs.simple_yaml_tool import SimpleYamlTool
from pkgs.basic_tool import BasicTool
_config = SimpleYamlTool.read_yaml("config.yaml")
"""
===============info handlers===============
"""
def extension_handler(paragraph: Text):
return GnVariableParser.string_parser("output_extension", paragraph).strip('"')
def hap_name_handler(paragraph: Text):
return GnVariableParser.string_parser("hap_name", paragraph).strip('"')
def target_type_handler(paragraph: Text):
tt = GnVariableParser.string_parser("target_type", paragraph).strip('"')
if not tt:
logging.info("parse 'target_type' failed, maybe it's a variable")
return tt
"""
===============gn lineno collector===============
"""
def gn_lineno_collect(match_pattern: str, project_path: str) -> DefaultDict[str, List[int]]:
"""
在整个项目路径下搜索有特定target类型的BUILD.gn
:param match_pattern: 进行grep的pattern支持扩展的正则
:param project_path: 项目路径搜索路径
:return: {gn_file: [line_no_1, line_no_2, ..]}
"""
black_list = map(lambda x: os.path.join(
project_path, x), _config.get("black_list"))
def handler(content: Text) -> List[str]:
return list(filter(lambda y: len(y) > 0, list(map(lambda x: x.strip(), content.split("\n")))))
grep_list = BasicTool.grep_ern(match_pattern, path=project_path, include="BUILD.gn", exclude=tuple(black_list),
post_handler=handler)
gn_line_dict: DefaultDict[str, List[int]] = defaultdict(list)
for gl in grep_list:
gn_file, line_no, _ = gl.split(":")
gn_line_dict[gn_file].append(line_no)
return gn_line_dict
"""
===============target name parser===============
"""
class TargetNameParser:
@classmethod
def single_parser(cls, paragraph: Text) -> str:
"""
查找类似shared_library("xxx")这种括号内只有一个参数的target的名称
:param paragraph: 要解析的段落
:return: target名称如果是变量不会对其进行解析
"""
return BasicTool.re_group_1(paragraph, r"\w+\((.*)\)")
@classmethod
def second_parser(cls, paragraph: Text) -> str:
"""
查找类似target("shared_library","xxx")这种的target名称括号内第二个参数
:param paragraph: 要解析的段落
:return: target名称如果是变量不会的其进行解析
"""
return BasicTool.re_group_1(paragraph, r"\w+\(.*?, *(.*?)\)")
"""
===============post handlers===============
"""
class BasePostHandler(ABC):
@abstractmethod
def run(self, unit: Dict[str, AnyStr]) -> str:
...
def __call__(self, unit: Dict[str, AnyStr]) -> str:
return self.run(unit)
class DefaultPostHandler(BasePostHandler):
def run(self, unit: Dict[str, AnyStr]):
return unit["output_name"]
class HAPPostHandler(BasePostHandler):
"""
for ohos_hap"""
def run(self, unit: Dict[str, AnyStr]):
extension = _config.get("default_extension").get("app")
gn_hap_name = unit.get("hap_name")
if gn_hap_name:
return gn_hap_name+extension
return unit["output_name"]+extension
class SOPostHandler(BasePostHandler):
"""
for shared_library"""
def run(self, unit: Dict[str, AnyStr]):
output_name = unit["output_name"]
prefix = _config.get("default_prefix").get("shared_library")
if unit.get("extension"):
extension = unit.get("extension")
else:
extension = _config.get("default_extension").get("shared_library")
if not extension.startswith('.'):
extension = '.'+extension
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class APostHandler(BasePostHandler):
"""
for static library"""
def run(self, unit: Dict[str, AnyStr]):
output_name = unit["output_name"]
prefix = _config.get("default_prefix").get("static_library")
extension: str = _config.get("default_extension").get("static_library")
if not extension.startswith('.'):
extension = '.'+extension
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class LiteLibPostHandler(BasePostHandler):
"""
for lite_library"""
def run(self, unit: Dict[str, AnyStr]):
tp = unit["real_target_type"]
output_name = unit["output_name"]
if tp == "static_library":
prefix = _config.get("default_prefix").get("static_library")
extension = _config.get("default_extension").get("static_library")
elif tp == "shared_library":
prefix = _config.get("default_prefix").get("shared_library")
extension = _config.get("default_extension").get("shared_library")
else:
prefix = str()
extension = str()
if not extension.startswith('.'):
extension = '.'+extension
if output_name.startswith(prefix):
return output_name+extension
return prefix+output_name+extension
class LiteComponentPostHandler(BasePostHandler):
"""
for lite_component"""
def run(self, unit: Dict[str, AnyStr]):
tp = unit["real_target_type"]
output_name = unit["output_name"]
extension = unit.get("output_extension")
if tp == "shared_library":
prefix = _config.get("default_prefix").get("shared_library")
extension = _config.get("default_extension").get("shared_library")
else:
if tp != "executable":
unit["description"] = "virtual node"
prefix = str()
extension = str()
if not extension.startswith('.'):
extension = '.'+extension
return prefix+output_name+extension
class TargetPostHandler(BasePostHandler):
"""
for target(a,b){}"""
def run(self, unit: Dict[str, AnyStr]):
...
def LiteLibS2MPostHandler(unit: Dict, result_dict: Dict) -> None:
rt = unit.get("real_target_type")
new_unit = copy.deepcopy(unit)
if rt == "shared_library":
new_unit["real_target_type"] = "static_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
elif rt == "static_library":
new_unit["real_target_type"] = "shared_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
else:
logging.warning(
f"target type should be 'shared_library' or 'static_library', but got '{rt}'")
new_unit["real_target_type"] = "shared_library"
k = LiteLibPostHandler()(new_unit)
new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_unit
new_new_unit = copy.deepcopy(unit)
new_new_unit["real_target_type"] = "static_library"
k = LiteLibPostHandler()(new_new_unit)
new_new_unit["description"] = "may not exist"
result_dict["lite_library"][k] = new_new_unit

View File

@ -13,10 +13,8 @@
# limitations under the License.
#
from threading import RLock
from typing import *
from abc import ABC, abstractmethod
from collections import defaultdict
import os
import logging
@ -49,7 +47,7 @@ class BaseProcessor(ABC):
Text], Union[str, list]]] = dict(),
unit_post_handler: BasePostHandler = do_nothing,
resource_field: str = None,
S2MPostHandler: Callable[[Dict, Dict], None] = None
ud_post_handler: Callable[[Dict, Dict], None] = None
):
"""
:param project_path: 项目根路径
@ -62,7 +60,7 @@ class BaseProcessor(ABC):
SourceParser是对target段落进行分析处理的Callable接受一个字符串作为参数
:param unit_post_handler: 对最终要存储的结果字典进行后处理应当返回一个字符串作为存储时的key且该key应为预期产物去除前后缀后的名字
:resource_field: 针对资源类target,资源字段,如files = ["a.txt","b.txt"],则field为files
:S2MPostHandler: 将一个target保存为多个多个的处理器
:ud_post_handler: 参数为unit和result_dict的handler
"""
if target_type not in result_dict.keys():
result_dict[target_type] = dict()
@ -77,7 +75,7 @@ class BaseProcessor(ABC):
self.other_info_handlers = other_info_handlers
self.unit_post_handler = unit_post_handler
self.resource_field = resource_field
self.S2MPostHandler = S2MPostHandler
self.ud_post_handler = ud_post_handler
def _append(self, key: str, unit: Dict) -> None:
"""
@ -150,8 +148,8 @@ class DefaultProcessor(BaseProcessor):
result[k] = h(paragraph)
key = self.unit_post_handler(result)
self._append(key, result)
if self.S2MPostHandler:
self.S2MPostHandler(result, self.result_dict)
if self.ud_post_handler:
self.ud_post_handler(result, self.result_dict)
def run(self):
for gn_path, line_no_list in self.gn_file_line_no_dict.items():
@ -181,17 +179,6 @@ class StrResourceProcessor(DefaultProcessor):
def helper(self, target_name: str, paragraph: str, gn_path: str, line_no: int, _sub: str, _com: str) -> Tuple[str]:
resources = GnVariableParser.string_parser(
self.resource_field, paragraph)
# if not resources.strip('"'):
# return
# if GnCommonTool.contains_gn_variable(resources):
# resources = GnCommonTool.replace_gn_variables(
# resources, gn_path, self.project_path).strip('"')
# # FIXME 如果出现换行导致的在replace_gn_variables里面没有查找到变量的对应值,则直接取target_name作为resources
# if GnCommonTool.contains_gn_variable(resources):
# resources = target_name
# else:
# resources = resources.strip('"')
################start
if not resources:
return
_, resources = os.path.split(resources.strip('"'))
@ -199,7 +186,6 @@ class StrResourceProcessor(DefaultProcessor):
if GnCommonTool.contains_gn_variable(resources):
resources = GnCommonTool.replace_gn_variables(
resources, gn_path, self.project_path).strip('"')
################end
sub = GnVariableParser.string_parser("subsystem_name", paragraph)
com = GnVariableParser.string_parser("part_name", paragraph)
sub, sub_from = _gn_var_process(self.project_path, sub, _sub, gn_path, "gn", "json")

View File

@ -117,7 +117,7 @@
-n DEVICE_NUM, --device_num DEVICE_NUM
device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800
-o OUTPUT_FILENAME, --output_filename OUTPUT_FILENAME
base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result
base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result
-e EXCEL, --excel EXCEL
if output result as excel, default: False. eg: -e True
```

View File

@ -1,6 +1,7 @@
import sys
import typing
import os
import glob
from pathlib import Path
from typing import *

View File

@ -39,10 +39,9 @@ class HDCTool:
stderr = str(cp.stderr)
return device_num in stderr or device_num in stdout
__MODE = typing.Literal["stdout", "stderr"]
@classmethod
def exec(cls, args: list, output_from: __MODE = "stdout"):
def exec(cls, args: list, output_from: str = "stdout"):
cp = subprocess.run(args, capture_output=True)
if output_from == "stdout":
return cp.stdout.decode()
@ -408,7 +407,7 @@ def get_args():
parser.add_argument("-n", "--device_num", type=str, required=True,
help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
help="base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result")
help="base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result")
parser.add_argument("-e", "--excel", type=bool, default=False,
help="if output result as excel, default: False. eg: -e True")
args = parser.parse_args()