Signed-off-by: 黄国辉 <huangguohui6@huawei.com>

Signed-off-by: 黄国辉 <huangguohui6@huawei.com>
This commit is contained in:
黄国辉 2024-07-02 09:05:51 +00:00 committed by Gitee
parent f882bcaaf0
commit 0fee664fec
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
27 changed files with 1620 additions and 1628 deletions

View File

@ -122,6 +122,45 @@ class DeviceShell:
return device_para
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
try:
if print_flag:
print("command: " + command)
if subprocess.call(command, shell=True, timeout=timeout) == 0:
print("results: successed")
return True
except Exception as error:
print("Exception: %s" % str(error))
print("results: failed")
return False
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
result = ""
try:
data, _ = proc.communicate()
if isinstance(data, bytes):
result = data.decode('utf-8', 'ignore')
finally:
proc.stdout.close()
proc.stderr.close()
return result if result else data
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path
def remount(self):
if self.conn_type:
remount = "target mount"
@ -185,19 +224,6 @@ class DeviceShell:
command,
QUOTATION_MARKS))
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
try:
if print_flag:
print("command: " + command)
if subprocess.call(command, shell=True, timeout=timeout) == 0:
print("results: successed")
return True
except Exception as error:
print("Exception: %s" % str(error))
print("results: failed")
return False
def shell_with_output(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
HDC_TOOLS,
@ -205,29 +231,4 @@ class DeviceShell:
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
result = ""
try:
data, _ = proc.communicate()
if isinstance(data, bytes):
result = data.decode('utf-8', 'ignore')
finally:
proc.stdout.close()
proc.stderr.close()
return result if result else data
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path

View File

@ -32,6 +32,23 @@ class DeviceManager:
self.tv_device_list = []
self.watch_device_list = []
self.make_device_list(result_path)
@staticmethod
def get_device_info_list(result):
device_info_list = []
tmp_path = os.path.join(result, "temp")
device_info_file_path = os.path.join(tmp_path,
"device_info_file.txt")
if os.path.exists(device_info_file_path):
with open(device_info_file_path, "r") as file_handle:
lines = file_handle.readlines()
for line in lines:
line = line.replace("\n", "")
line = line.strip()
temp = line.split(",")
device_info_list.append(temp)
return device_info_list
def make_device_adapter(self, device_info_list, device_name):
device = DeviceShell(self.is_hdc, device_sn=device_info_list[0],
@ -71,22 +88,5 @@ class DeviceManager:
setattr(self, device.name, device)
return
@staticmethod
def get_device_info_list(result):
device_info_list = []
tmp_path = os.path.join(result, "temp")
device_info_file_path = os.path.join(tmp_path,
"device_info_file.txt")
if os.path.exists(device_info_file_path):
with open(device_info_file_path, "r") as file_handle:
lines = file_handle.readlines()
for line in lines:
line = line.replace("\n", "")
line = line.strip()
temp = line.split(",")
device_info_list.append(temp)
return device_info_list
##############################################################################
##############################################################################

View File

@ -85,40 +85,6 @@ class Distribute:
self.agent_list = agent_list
self.hdc_tools = hdc_tools
def exec_agent(self, device, target_name, result_path, options):
driver = get_current_driver(device, target_name, self.hdc_tools)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_agent_desc_file(device)
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
driver.execute(suite_path, result_path, options, background=True)
return self._check_thread(device, target_name)
def exec_major(self, device, target_name, result_path, options):
driver = get_current_driver(device, target_name, self.hdc_tools)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_major_desc_file()
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
return driver.execute(suite_path, result_path, options, background=False)
@staticmethod
def pull_result(device, source_path, result_save_path):
_, file_name = os.path.split(source_path)
@ -141,75 +107,6 @@ class Distribute:
break
return True if checksum < 100 else False
def _make_agent_desc_file(self, device):
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != '':
device_uuid_list += self._query_device_uuid(self.major) + ","
if self._query_device_ip(device) != "":
agent_ip_list += self._query_device_ip(device) + ","
for agent in self.agent_list:
if self._query_device_uuid(agent):
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
else:
if self._query_device_agent_uuid(self.major):
device_uuid_list += self._query_device_agent_uuid(self.major) + ","
if self._query_device_hdc_ip(device):
agent_ip_list += self._query_device_hdc_ip(device) + ","
for agent in self.agent_list:
if self._query_device_agent_uuid(agent):
device_uuid_list += self._query_device_agent_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
def _make_major_desc_file(self):
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != "NoneTyple":
device_uuid_list += self._query_device_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_ip(agent) != "" and self._query_device_uuid(agent) != "":
agent_ip_list += self._query_device_ip(agent) + ","
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
else:
if self._query_device_major_uuid(self.major):
device_uuid_list += self._query_device_major_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_major_uuid(agent):
agent_ip_list += self._query_device_hdc_ip(agent) + ","
device_uuid_list += self._query_device_major_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
@staticmethod
def _query_device_ip(device):
output = device.shell_with_output("getprop ro.hardware")
@ -317,7 +214,108 @@ class Distribute:
file_desc.write(device_info)
os.rename(file_path, final_file)
def exec_agent(self, device, target_name, result_path, options):
driver = get_current_driver(device, target_name, self.hdc_tools)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_agent_desc_file(device)
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
driver.execute(suite_path, result_path, options, background=True)
return self._check_thread(device, target_name)
def exec_major(self, device, target_name, result_path, options):
driver = get_current_driver(device, target_name, self.hdc_tools)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_major_desc_file()
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
return driver.execute(suite_path, result_path, options, background=False)
def _make_agent_desc_file(self, device):
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != '':
device_uuid_list += self._query_device_uuid(self.major) + ","
if self._query_device_ip(device) != "":
agent_ip_list += self._query_device_ip(device) + ","
for agent in self.agent_list:
if self._query_device_uuid(agent):
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
else:
if self._query_device_agent_uuid(self.major):
device_uuid_list += self._query_device_agent_uuid(self.major) + ","
if self._query_device_hdc_ip(device):
agent_ip_list += self._query_device_hdc_ip(device) + ","
for agent in self.agent_list:
if self._query_device_agent_uuid(agent):
device_uuid_list += self._query_device_agent_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
def _make_major_desc_file(self):
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != "NoneTyple":
device_uuid_list += self._query_device_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_ip(agent) != "" and self._query_device_uuid(agent) != "":
agent_ip_list += self._query_device_ip(agent) + ","
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
else:
if self._query_device_major_uuid(self.major):
device_uuid_list += self._query_device_major_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_major_uuid(agent):
agent_ip_list += self._query_device_hdc_ip(agent) + ","
device_uuid_list += self._query_device_major_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
##############################################################################
##############################################################################

View File

@ -29,6 +29,7 @@ from _core.logger import platform_logger
__all__ = ["BenchmarkReporter"]
LOG = platform_logger("BenchmarkReporter")
@Plugin(type=Plugin.REPORTER, id=TestType.benchmark)
class BenchmarkReporter(IReporter):

View File

@ -27,6 +27,7 @@ MODES = stat.S_IWUSR | stat.S_IRUSR
SETTING_RED_STYLE = """\033[33;31m%s\033[0m"""
def load_json_data(json_file_path):
json_data = {}
if os.path.isfile(json_file_path):
@ -68,6 +69,7 @@ def get_file_list_by_postfix(path, postfix, filter_jar=""):
file_list.append(file_path)
return file_list
class BenchmarkReport(object):
SUBSYSTEM_SUMMARY = "OHOS_SUBSYSTEM_SUMMARY"
ENABLE_LINK = "OHOS_ENABLE_PASSCASE_LINK"
@ -88,24 +90,7 @@ class BenchmarkReport(object):
self.sbs_mdl_summary_list = []
self.benchmark_list = []
self._init_default_item()
def _init_default_item(self):
self.default_item.append("Subsystem")
self.default_item.append("Module")
self.default_item.append("Testsuit")
self.default_item.append("Benchmark")
self.default_item.append("Mode")
self.default_item.append("RunType")
self.default_item.append("TestTargetName")
self.default_item.append("TestTargetMethod")
self.default_item.append("Repetitions")
self.default_item.append("RepetitionIndex")
self.default_item.append("Threads")
self.default_item.append("Iterations")
self.default_item.append("Score")
self.default_item.append("CpuTime")
self.max_index = len(self.default_item) + 1000
def generate_benchmark(self, args):
if args is None or len(args) <= 2:
print(SETTING_RED_STYLE %
@ -129,6 +114,23 @@ class BenchmarkReport(object):
self._generate_benchmark_summary_report(os.path.abspath(dest_path))
self._generate_all_benchmark_detail(os.path.abspath(dest_path))
def _init_default_item(self):
self.default_item.append("Subsystem")
self.default_item.append("Module")
self.default_item.append("Testsuit")
self.default_item.append("Benchmark")
self.default_item.append("Mode")
self.default_item.append("RunType")
self.default_item.append("TestTargetName")
self.default_item.append("TestTargetMethod")
self.default_item.append("Repetitions")
self.default_item.append("RepetitionIndex")
self.default_item.append("Threads")
self.default_item.append("Iterations")
self.default_item.append("Score")
self.default_item.append("CpuTime")
self.max_index = len(self.default_item) + 1000
def _remove_iterations(self, mdl_summary_list):
final_mdl_summary = []
for item_info in mdl_summary_list:

View File

@ -166,7 +166,7 @@ def generate(args):
#complie fuzzer project
def make(args, stdout=None):
def make(args, stdout=None):
"""make fuzzer module."""
color_logger = Colored.get_project_logger()

View File

@ -56,6 +56,26 @@ class Colored(object):
self.log_project = log_project
self.log_date = time.strftime("%Y%m%d%H%M%S", time.localtime())
@staticmethod
def get_fuzz_log_dir():
return Colored.LOG_DIR
@staticmethod
def log_task_init(project):
Colored.LOG_TO_FILE = True
Colored.LOG_PROJECT = project
Colored.LOG_DATE = time.strftime("%Y%m%d%H%M%S", time.localtime())
if not os.path.exists(Colored.LOG_DIR):
os.mkdir(Colored.LOG_DIR)
project_log_dir = Colored.get_fuzz_project_log_dir()
if not os.path.exists(project_log_dir):
os.mkdir(project_log_dir)
current_project_log_dir = Colored.get_fuzz_current_project_log_dir()
if not os.path.exists(current_project_log_dir):
os.mkdir(current_project_log_dir)
def start_log_file(self):
self.is_log_file = True
@ -100,7 +120,7 @@ class Colored(object):
Colored.RESET
)
else:
msg = '{}{}{}'.format(
msg = '{}{}{}'.format(
getattr(Colored, color),
s,
Colored.RESET
@ -137,25 +157,3 @@ class Colored(object):
def simple_print(self, s):
self.loghook(s)
print(s)
@staticmethod
def get_fuzz_log_dir():
return Colored.LOG_DIR
@staticmethod
def log_task_init(project):
Colored.LOG_TO_FILE = True
Colored.LOG_PROJECT = project
Colored.LOG_DATE = time.strftime("%Y%m%d%H%M%S", time.localtime())
if not os.path.exists(Colored.LOG_DIR):
os.mkdir(Colored.LOG_DIR)
project_log_dir = Colored.get_fuzz_project_log_dir()
if not os.path.exists(project_log_dir):
os.mkdir(project_log_dir)
current_project_log_dir = Colored.get_fuzz_current_project_log_dir()
if not os.path.exists(current_project_log_dir):
os.mkdir(current_project_log_dir)

View File

@ -56,9 +56,6 @@ class RunResult():
"report_progress": 0
}
def get_log(self):
return "code :{}, msg: {}".format(self.code, self.data)
@staticmethod
def filter_log(log_str):
ansi_escape = re.compile(r'''
@ -75,6 +72,9 @@ class RunResult():
result = ansi_escape.sub('', log_str)
return result
def get_log(self):
return "code :{}, msg: {}".format(self.code, self.data)
def analysis(self, result, outdir):
pass

View File

@ -186,7 +186,7 @@ REPORT_CSS_TEMPLATE = """
def render_tbody(data):
res = ""
res = ""
for row in data:
row_line = "<tr>"
for row_td in row:

View File

@ -202,7 +202,7 @@ def gen_subsystem_trace_info(subsystem, data_dir, test_dir, lcovrc_path):
def cut_info(subsystem, test_dir):
trace_file = os.path.join(
CODEPATH, REPORT_PATH, "single_test",
test_dir, f"{subsystem}_output.info"
test_dir, f"{subsystem}_output.info"
)
output_name = os.path.join(
CODEPATH, REPORT_PATH, "single_test",

View File

@ -246,8 +246,7 @@ def get_para_sub_string(content):
parentheses_list_left = []
parentheses_list_right = []
for index in range(len(content)):
char = content[index]
for index, char in enumerate(content):
if "<" == char:
if 0 == len(parentheses_list_left):
start_index = index

View File

@ -16,20 +16,19 @@
# limitations under the License.
#
import os
import sys
import stat
import datetime
from importlib import reload
reload(sys)
import os
import stat
FLAGS_WRITE = os.O_WRONLY | os.O_CREAT | os.O_EXCL
FLAGS_ADD = os.O_WRONLY | os.O_APPEND | os.O_CREAT
MODES = stat.S_IWUSR | stat.S_IRUSR
html_head = """
HTML_HEAD = """
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
@ -75,12 +74,12 @@ html_head = """
</style>
</head>
"""
html_body_start = """
HTML_BODY_START = """
<body>"""
html_body_ended = """
HTML_BODY_ENDED = """
</body>"""
html_ended = """
HTML_ENDED = """
</html>"""
@ -97,8 +96,8 @@ def create_html_start(reportpath):
if os.path.exists(reportpath):
os.remove(reportpath)
with os.fdopen(os.open(reportpath, FLAGS_WRITE, MODES), 'w') as report:
report.write(html_head)
report.write(html_body_start)
report.write(HTML_HEAD)
report.write(HTML_BODY_START)
except(IOError, ValueError):
print("Error for create html start ",)
@ -244,7 +243,7 @@ def create_table_test(reportpath, subsystem_name, datalist, total_count, covered
def create_html_ended(reportpath):
try:
with os.fdopen(os.open(reportpath, FLAGS_ADD, MODES), 'a') as report:
report.write(html_body_ended)
report.write(html_ended)
report.write(HTML_BODY_ENDED)
report.write(HTML_ENDED)
except(IOError, ValueError):
print("Error for create html end")

View File

@ -26,6 +26,7 @@ import platform
import linecache
import traceback
from multiprocessing import Pool
from lxml import html
from selectolax.parser import HTMLParser
@ -94,18 +95,6 @@ class KeywordRegistration:
self.keyword_file_path = os.path.normcase(
os.path.join(os.path.dirname(__file__), "keyword.json"))
def get_keyword_info(self):
"""
获取报备关键字信息
"""
try:
with open(self.keyword_file_path, "r") as file:
keyword_dict = json.load(file)
keyword_list = keyword_dict.get("KEYWORD")
return keyword_list
except (FileNotFoundError, AttributeError, FileExistsError):
return []
@staticmethod
def get_coverage_content(file_path):
"""
@ -152,6 +141,206 @@ class KeywordRegistration:
tag = tag.replace(item, replace_item)
return tag
@staticmethod
def get_branch_line_list(keyword_line: int, branch_line_list: list):
"""
获取大于关键字行号的所有分支行号
"""
if keyword_line in branch_line_list:
index = branch_line_list.index(keyword_line)
branch_line_list = branch_line_list[index:]
else:
for line in branch_line_list:
if line > keyword_line:
index = branch_line_list.index(line)
branch_line_list = branch_line_list[index:]
break
return branch_line_list
@staticmethod
def get_keyword_judge_char(keyword, keyword_source_code):
"""
获取关键字替代字符
"""
if "&" in keyword:
keyword = keyword.replace("&", "<")
keyword_index = keyword_source_code.find(keyword)
if keyword_index == -1:
return ""
try:
keyword_code = keyword_source_code[:keyword_index + len(keyword)]
if " = " in keyword_code:
judge_key = keyword_code.split(" = ")[0].split()[-1]
else:
bracket_index = keyword_code.find("(")
bracket_code = keyword_code[:bracket_index]
judge_key = bracket_code.split()[-1]
return judge_key
except (IndexError, ValueError):
return ""
@staticmethod
def get_branch_data_by_tag(tag_html: str, symbol_status=None):
"""
根据前端标签获取分支数据
"""
if symbol_status:
key = r"#+\-*"
else:
key = r"#+\-"
branch_line_list = re.findall(rf"> ([{key}]) </span>", tag_html)
return branch_line_list
@staticmethod
def get_judge_condition_index(judge_key: str, source_code: str):
"""
获取判断条件索引
"""
keyword_index_list = []
keyword_index_list_append = keyword_index_list.append
condition_str_list = re.split(rf"\|\||&&", source_code)
for index, code_str in enumerate(condition_str_list):
if judge_key in code_str:
keyword_index_list_append(index)
return keyword_index_list, condition_str_list
@staticmethod
def update_source_code_tag(html_tag: str):
replace_item_list = ["lineNum", "lineCov", "lineNoCov"]
for item in replace_item_list:
if item in html_tag:
replace_item = (item + "Update").lower()
html_tag = html_tag.replace(item, replace_item)
return html_tag
@staticmethod
def modify_tag_style(tag, rate):
"""
修改标签样式
"""
if 75 <= rate < 90:
tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed")
tag = tag.replace("coverPerLo", "coverPerMed")
tag = tag.replace("coverNumLo", "coverNumMed")
elif rate >= 90:
tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi")
tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi")
tag = tag.replace("coverPerLo", "coverPerHi")
tag = tag.replace("coverNumLo", "coverNumHi")
tag = tag.replace("coverPerMed", "coverPerHi")
tag = tag.replace("coverNumMed", "coverNumHi")
return tag
@staticmethod
def _branch_replace(branch):
"""
分支符号替换
"""
if branch == "#":
branch = "> # <"
elif branch == "+":
branch = "> + <"
elif branch == "*":
branch = "> * <"
else:
branch = "> - <"
return branch
@staticmethod
def _single_condition_modify_html(branch_html, branch_list):
"""
单条件修改代码块html
"""
line_item = '<span class="lineNum"> </span>'
line_feed_index = branch_html.find(line_item)
if line_feed_index == -1:
if "+" in branch_list:
update_branch_tag = branch_html.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
else:
try:
first_branch = branch_list[0]
first_branch = "> " + first_branch + " <"
first_branch_index = branch_html.find(first_branch)
branch_tag = branch_html[:first_branch_index + 5]
update_branch_tag = branch_html[first_branch_index + 5:]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
update_branch_tag = branch_tag + update_branch_tag
except ValueError:
return ""
else:
line_feed_index = branch_html.find(line_item)
update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1]
if "-" not in branch_list and "+" not in branch_list:
del_count = update_branch_tag.count("> # <")
update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1)
else:
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
branch_tag = branch_html[line_feed_index + len(line_item) + 1:]
line_feed_index = branch_tag.find(line_item)
if line_feed_index == -1:
branch_tag = branch_tag.replace("> - <", "> <")
branch_tag = branch_tag.replace("> # <", "> <")
update_branch_tag += branch_tag
else:
loop_count = 0
while line_feed_index + 1:
loop_count += 1
if loop_count > 200:
continue
try:
update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
branch_tag = branch_tag[line_feed_index + len(line_item) + 1:]
line_feed_index = branch_tag.find(line_item)
except ValueError:
return ""
branch_tag = branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
update_branch_tag += branch_tag
return update_branch_tag
def get_break_line_tag(self, content, origin_branch_html, branch_line):
"""
获取分支换行的判断条件源码tag
"""
get_tag = self.get_tag
left_brace_exist = False
line_break = loop_count = 0
while not left_brace_exist:
if loop_count > 10:
break
line_break += 1
origin_branch_html = os.path.join(origin_branch_html, "\n")
next_line_tag = get_tag(content, branch_line + line_break)
if "{" in next_line_tag:
left_brace_exist = True
origin_branch_html += next_line_tag
loop_count += 1
return origin_branch_html
def get_keyword_info(self):
"""
获取报备关键字信息
"""
try:
with open(self.keyword_file_path, "r") as file:
keyword_dict = json.load(file)
keyword_list = keyword_dict.get("KEYWORD")
return keyword_list
except (FileNotFoundError, AttributeError, FileExistsError):
return []
def get_coverage_lines_by_branch(self, file_path, content=None):
"""
获取覆盖率报告中的所有的if分支行号
@ -308,121 +497,6 @@ class KeywordRegistration:
return function_name
return function_name
@staticmethod
def get_branch_line_list(keyword_line: int, branch_line_list: list):
"""
获取大于关键字行号的所有分支行号
"""
if keyword_line in branch_line_list:
index = branch_line_list.index(keyword_line)
branch_line_list = branch_line_list[index:]
else:
for line in branch_line_list:
if line > keyword_line:
index = branch_line_list.index(line)
branch_line_list = branch_line_list[index:]
break
return branch_line_list
@staticmethod
def get_keyword_judge_char(keyword, keyword_source_code):
"""
获取关键字替代字符
"""
if "&" in keyword:
keyword = keyword.replace("&", "<")
keyword_index = keyword_source_code.find(keyword)
if keyword_index == -1:
return ""
try:
keyword_code = keyword_source_code[:keyword_index + len(keyword)]
if " = " in keyword_code:
judge_key = keyword_code.split(" = ")[0].split()[-1]
else:
bracket_index = keyword_code.find("(")
bracket_code = keyword_code[:bracket_index]
judge_key = bracket_code.split()[-1]
return judge_key
except (IndexError, ValueError):
return ""
@staticmethod
def get_branch_data_by_tag(tag_html: str, symbol_status=None):
"""
根据前端标签获取分支数据
"""
if symbol_status:
key = r"#+\-*"
else:
key = r"#+\-"
branch_line_list = re.findall(rf"> ([{key}]) </span>", tag_html)
return branch_line_list
@staticmethod
def get_judge_condition_index(judge_key: str, source_code: str):
"""
获取判断条件索引
"""
keyword_index_list = []
keyword_index_list_append = keyword_index_list.append
condition_str_list = re.split(rf"\|\||&&", source_code)
for index, code_str in enumerate(condition_str_list):
if judge_key in code_str:
keyword_index_list_append(index)
return keyword_index_list, condition_str_list
@staticmethod
def update_source_code_tag(html_tag: str):
replace_item_list = ["lineNum", "lineCov", "lineNoCov"]
for item in replace_item_list:
if item in html_tag:
replace_item = (item + "Update").lower()
html_tag = html_tag.replace(item, replace_item)
return html_tag
def get_break_line_tag(self, content, origin_branch_html, branch_line):
"""
获取分支换行的判断条件源码tag
"""
get_tag = self.get_tag
left_brace_exist = False
line_break = loop_count = 0
while not left_brace_exist:
if loop_count > 10:
break
line_break += 1
origin_branch_html = os.path.join(origin_branch_html, "\n")
next_line_tag = get_tag(content, branch_line + line_break)
if "{" in next_line_tag:
left_brace_exist = True
origin_branch_html += next_line_tag
loop_count += 1
return origin_branch_html
@staticmethod
def modify_tag_style(tag, rate):
"""
修改标签样式
"""
if 75 <= rate < 90:
tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed")
tag = tag.replace("coverPerLo", "coverPerMed")
tag = tag.replace("coverNumLo", "coverNumMed")
elif rate >= 90:
tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi")
tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi")
tag = tag.replace("coverPerLo", "coverPerHi")
tag = tag.replace("coverNumLo", "coverNumHi")
tag = tag.replace("coverPerMed", "coverPerHi")
tag = tag.replace("coverNumMed", "coverNumHi")
return tag
def update_coverage_ratio_tag(self, file_path):
"""
修改覆盖率比率数据
@ -583,187 +657,6 @@ class KeywordRegistration:
except (IndexError, TypeError, FileNotFoundError):
pass
def _check_if_branch_line(self, judge_key, sub_branch_line_list,
key_line, content, function_name):
"""
确定if分支行号
"""
if_branch_line = None
for branch_line in sub_branch_line_list:
if branch_line == key_line:
if_branch_line = key_line
break
# 获取分支行所在函数名
branch_function_name = self.get_line_funcname(
branch_line, content)
# 关键字范围只在关键字所在函数之内
branch_line_tag = self.get_tag(content, branch_line)
try:
if "{" not in branch_line_tag:
branch_line_tag = self.get_break_line_tag(
content, branch_line_tag, branch_line)
branch_line_source_code = self.get_source_code(
branch_line_tag)
if function_name == branch_function_name:
if judge_key in branch_line_source_code:
if_branch_line = branch_line
break
else:
break
except (ValueError, KeyError):
pass
return if_branch_line
@staticmethod
def _branch_replace(branch):
"""
分支符号替换
"""
if branch == "#":
branch = "> # <"
elif branch == "+":
branch = "> + <"
elif branch == "*":
branch = "> * <"
else:
branch = "> - <"
return branch
@staticmethod
def _single_condition_modify_html(branch_html, branch_list):
"""
单条件修改代码块html
"""
line_item = '<span class="lineNum"> </span>'
line_feed_index = branch_html.find(line_item)
if line_feed_index == -1:
if "+" in branch_list:
update_branch_tag = branch_html.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
else:
try:
first_branch = branch_list[0]
first_branch = "> " + first_branch + " <"
first_branch_index = branch_html.find(first_branch)
branch_tag = branch_html[:first_branch_index + 5]
update_branch_tag = branch_html[first_branch_index + 5:]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
update_branch_tag = branch_tag + update_branch_tag
except ValueError:
return ""
else:
line_feed_index = branch_html.find(line_item)
update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1]
if "-" not in branch_list and "+" not in branch_list:
del_count = update_branch_tag.count("> # <")
update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1)
else:
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
branch_tag = branch_html[line_feed_index + len(line_item) + 1:]
line_feed_index = branch_tag.find(line_item)
if line_feed_index == -1:
branch_tag = branch_tag.replace("> - <", "> <")
branch_tag = branch_tag.replace("> # <", "> <")
update_branch_tag += branch_tag
else:
loop_count = 0
while line_feed_index + 1:
loop_count += 1
if loop_count > 200:
continue
try:
update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
branch_tag = branch_tag[line_feed_index + len(line_item) + 1:]
line_feed_index = branch_tag.find(line_item)
except ValueError:
return ""
branch_tag = branch_tag.replace("> - <", "> <")
update_branch_tag = update_branch_tag.replace("> # <", "> <")
update_branch_tag += branch_tag
return update_branch_tag
def _multi_condition_modify_html(self, branch_html, branch_length,
condition_str_list, judge_index_list):
"""
多条件修改代码块html
"""
line_item = '<span class="lineNum"> </span>'
if branch_length % len(condition_str_list):
line_feed_index = branch_html.find(line_item)
update_branch_tag = branch_html[:line_feed_index]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
branch_html = branch_html[line_feed_index:]
loop_count = 0
while line_feed_index + 1:
loop_count += 1
if loop_count > 200:
continue
line_feed_index = branch_html.count(line_item)
if line_feed_index > 1:
try:
line_feed_length = len(line_item)
branch_tag_before = branch_html[:line_feed_length]
branch_html = branch_html[line_feed_length:]
line_feed_index = branch_html.find(line_item)
branch_tag_after = branch_html[:line_feed_index]
branch_tag_after = branch_tag_after.replace("> - <", "> <")
branch_tag = branch_tag_before + branch_tag_after
except ValueError:
return ""
else:
branch_tag = branch_html
branch_tag = branch_tag.replace("> - <", "> <")
# 不再换行,索引为-1
line_feed_index = -1
update_branch_tag += branch_tag
if line_feed_index == -1:
branch_html = ""
else:
branch_html = branch_html[line_feed_index:]
if branch_html != "":
branch_html = branch_html.replace("> - <", "> <")
update_branch_tag += branch_html
else:
branch_list = self.get_branch_data_by_tag(branch_html, True)
update_branch_tag = ""
end_count = -1
try:
for index in judge_index_list:
branch_data = branch_list[:(index + 1) * 2]
# 要修改的分支数据列表长度
branch_data_length = len(branch_data)
change_status = False
for count, branch in enumerate(branch_data, 1):
if count <= end_count:
continue
end_count = count
branch = self._branch_replace(branch)
end_index = branch_html.find(branch)
branch_tag = branch_html[:end_index + 5]
if branch_data_length - count in [0, 1]:
if change_status:
continue
if branch == "> # <":
change_status = True
branch_tag = branch_tag.replace("> # <", "> * <")
elif branch == "> - <":
change_status = True
branch_tag = branch_tag.replace("> - <", "> * <")
update_branch_tag += branch_tag
branch_html = branch_html[end_index + 5:]
except (ValueError, TypeError):
return ""
update_branch_tag += branch_html
return update_branch_tag
def judge_branch_exists(self, file_path):
"""
判断报告是否存在分支
@ -918,6 +811,114 @@ class KeywordRegistration:
pool.close()
pool.join()
def _check_if_branch_line(self, judge_key, sub_branch_line_list,
key_line, content, function_name):
"""
确定if分支行号
"""
if_branch_line = None
for branch_line in sub_branch_line_list:
if branch_line == key_line:
if_branch_line = key_line
break
# 获取分支行所在函数名
branch_function_name = self.get_line_funcname(
branch_line, content)
# 关键字范围只在关键字所在函数之内
branch_line_tag = self.get_tag(content, branch_line)
try:
if "{" not in branch_line_tag:
branch_line_tag = self.get_break_line_tag(
content, branch_line_tag, branch_line)
branch_line_source_code = self.get_source_code(
branch_line_tag)
if function_name == branch_function_name:
if judge_key in branch_line_source_code:
if_branch_line = branch_line
break
else:
break
except (ValueError, KeyError):
pass
return if_branch_line
def _multi_condition_modify_html(self, branch_html, branch_length,
condition_str_list, judge_index_list):
"""
多条件修改代码块html
"""
line_item = '<span class="lineNum"> </span>'
if branch_length % len(condition_str_list):
line_feed_index = branch_html.find(line_item)
update_branch_tag = branch_html[:line_feed_index]
update_branch_tag = update_branch_tag.replace("> - <", "> <")
branch_html = branch_html[line_feed_index:]
loop_count = 0
while line_feed_index + 1:
loop_count += 1
if loop_count > 200:
continue
line_feed_index = branch_html.count(line_item)
if line_feed_index > 1:
try:
line_feed_length = len(line_item)
branch_tag_before = branch_html[:line_feed_length]
branch_html = branch_html[line_feed_length:]
line_feed_index = branch_html.find(line_item)
branch_tag_after = branch_html[:line_feed_index]
branch_tag_after = branch_tag_after.replace("> - <", "> <")
branch_tag = branch_tag_before + branch_tag_after
except ValueError:
return ""
else:
branch_tag = branch_html
branch_tag = branch_tag.replace("> - <", "> <")
# 不再换行,索引为-1
line_feed_index = -1
update_branch_tag += branch_tag
if line_feed_index == -1:
branch_html = ""
else:
branch_html = branch_html[line_feed_index:]
if branch_html != "":
branch_html = branch_html.replace("> - <", "> <")
update_branch_tag += branch_html
else:
branch_list = self.get_branch_data_by_tag(branch_html, True)
update_branch_tag = ""
end_count = -1
try:
for index in judge_index_list:
branch_data = branch_list[:(index + 1) * 2]
# 要修改的分支数据列表长度
branch_data_length = len(branch_data)
change_status = False
for count, branch in enumerate(branch_data, 1):
if count <= end_count:
continue
end_count = count
branch = self._branch_replace(branch)
end_index = branch_html.find(branch)
branch_tag = branch_html[:end_index + 5]
if branch_data_length - count in [0, 1]:
if change_status:
continue
if branch == "> # <":
change_status = True
branch_tag = branch_tag.replace("> # <", "> * <")
elif branch == "> - <":
change_status = True
branch_tag = branch_tag.replace("> - <", "> * <")
update_branch_tag += branch_tag
branch_html = branch_html[end_index + 5:]
except (ValueError, TypeError):
return ""
update_branch_tag += branch_html
return update_branch_tag
def main(report_path):

View File

@ -38,6 +38,25 @@ LOG = platform_logger("BuildManager")
##############################################################################
class BuildManager(object):
@classmethod
def build_version(cls, project_root_path, product_form):
if BuildTestcases(project_root_path).build_version(product_form):
LOG.info("The version compiled successfully.")
build_result = True
else:
LOG.info("The version compilation failed, please modify.")
build_result = False
return build_result
@classmethod
def build_gn_file(cls, project_root_path, product_form):
if BuildTestcases(project_root_path).build_gn_file(product_form):
LOG.info("The gn compiled successfully.")
build_result = True
else:
LOG.info("The gn compilation failed, please modify.")
build_result = False
return build_result
@classmethod
def _make_gn_file(cls, filepath, target_list):
@ -120,7 +139,52 @@ class BuildManager(object):
LOG.info("Test case compilation failed, please modify.")
return build_result
# 编译入口
def build_testcases(self, project_root_path, param):
if not os.path.exists(project_root_path):
LOG.error("%s is not exists." % project_root_path)
return False
LOG.info("--------------------------------------------------")
LOG.info("Building parameter:")
LOG.info("productform = %s" % param.productform)
LOG.info("testtype = %s" % str(param.testtype))
LOG.info("partname_list = %s" % str(param.partname_list))
LOG.info("testmodule = %s" % param.testmodule)
LOG.info("testsuit = %s" % param.testsuit)
LOG.info("testcase = %s" % param.testcase)
LOG.info("--------------------------------------------------")
LOG.info("")
LOG.info("**************************************************")
LOG.info("*************** Start build testcases ************")
LOG.info("**************************************************")
LOG.info("")
build_xts_result = True
build_result = True
if "partdeps" == param.partdeps:
LOG.info("**********************Start prebuild testcases****************************")
build_deps_files_result = self._compile_deps_files(project_root_path, param)
if build_deps_files_result:
self._compile_part_deps(project_root_path, param)
if "acts" in param.testtype or "hats" in param.testtype or "hits" in param.testtype:
LOG.info("**********************Start build xts testcases****************************")
build_xts_result = self._compile_xts_test_cases(project_root_path, param)
else:
LOG.info("**********************Start build subsystem testcases****************************")
build_result = self._compile_testcases(project_root_path, param)
LOG.info("")
LOG.info("**************************************************")
LOG.info("*************** Ended build testcases ************")
LOG.info("**************************************************")
LOG.info("")
return build_result and build_xts_result
# 编译入口
def _compile_testcases(self, project_root_path, para):
# 获取所有支持的产品3.1Release版本为["DAYU","Hi3516DV300","ohos-arm64","ohos-sdk","rk3568"]
all_product_list = scan_support_product()
@ -191,70 +255,5 @@ class BuildManager(object):
return build_result
@classmethod
def build_version(cls, project_root_path, product_form):
if BuildTestcases(project_root_path).build_version(product_form):
LOG.info("The version compiled successfully.")
build_result = True
else:
LOG.info("The version compilation failed, please modify.")
build_result = False
return build_result
@classmethod
def build_gn_file(cls, project_root_path, product_form):
if BuildTestcases(project_root_path).build_gn_file(product_form):
LOG.info("The gn compiled successfully.")
build_result = True
else:
LOG.info("The gn compilation failed, please modify.")
build_result = False
return build_result
def build_testcases(self, project_root_path, param):
if not os.path.exists(project_root_path):
LOG.error("%s is not exists." % project_root_path)
return False
LOG.info("--------------------------------------------------")
LOG.info("Building parameter:")
LOG.info("productform = %s" % param.productform)
LOG.info("testtype = %s" % str(param.testtype))
LOG.info("partname_list = %s" % str(param.partname_list))
LOG.info("testmodule = %s" % param.testmodule)
LOG.info("testsuit = %s" % param.testsuit)
LOG.info("testcase = %s" % param.testcase)
LOG.info("--------------------------------------------------")
LOG.info("")
LOG.info("**************************************************")
LOG.info("*************** Start build testcases ************")
LOG.info("**************************************************")
LOG.info("")
build_xts_result = True
build_result = True
if "partdeps" == param.partdeps:
LOG.info("**********************Start prebuild testcases****************************")
build_deps_files_result = self._compile_deps_files(project_root_path, param)
if build_deps_files_result:
self._compile_part_deps(project_root_path, param)
if "acts" in param.testtype or "hats" in param.testtype or "hits" in param.testtype:
LOG.info("**********************Start build xts testcases****************************")
build_xts_result = self._compile_xts_test_cases(project_root_path, param)
else:
LOG.info("**********************Start build subsystem testcases****************************")
build_result = self._compile_testcases(project_root_path, param)
LOG.info("")
LOG.info("**************************************************")
LOG.info("*************** Ended build testcases ************")
LOG.info("**************************************************")
LOG.info("")
return build_result and build_xts_result
##############################################################################
##############################################################################

View File

@ -110,6 +110,109 @@ class BuildTestcases(object):
if os.path.exists(xts_testcase_out_dir):
shutil.rmtree(xts_testcase_out_dir)
def build_fuzz_testcases(self, para):
self._delete_testcase_dir(para.productform)
helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py")
command = [sys.executable, helper_path, 'make',
'make_temp_test', para.productform]
if subprocess.call(command, shell=False) == 0:
build_result = True
else:
build_result = False
self._merge_testcase_dir(para.productform)
return build_result
# 编译测试用例(编译命令拼接)
def build_testcases(self, productform, target):
command = []
if self.is_build_example:
command.append("--gn-args")
command.append("build_example=true")
if isinstance(target, list):
for test in target:
command.append("--build-target")
command.append(test + "_test")
elif isinstance(target, str):
for test in target.split(','):
command.append("--build-target")
command.append(test)
if productform == "rk3568":
pass
else:
command.append("--abi-type")
command.append("generic_generic_arm_64only")
command.append("--device-type")
command.append(get_output_path().split("/")[-1])
command.append("--build-variant")
command.append("root")
command.append("--ccache")
self._delete_testcase_dir(productform)
build_result = self._execute_build_command(productform, command)
self._merge_testcase_dir(productform)
return build_result
# 编译XTS测试用例
def build_xts_testcases(self, para):
self._delete_xts_testcase_dir(para)
xts_build_command = []
if para.productform == "rk3568":
False
else:
xts_build_test_command = ["--abi-type", "generic_generic_arm_64only", "--device-type",
get_output_path().split("/")[-1], "--build-variant", "root", "--gn-args",
"build_xts=true", "--export-para", "xts_suitename:" + para.testtype[0]]
if len(para.subsystem) > 0:
input_subsystem = ",".join(para.subsystem)
xts_build_test_command.append("--build-target")
xts_build_test_command.append(input_subsystem)
return False
if para.testsuit != "" and len(para.subsystem) == 0:
LOG.error("Please specify subsystem.")
return False
xts_build_command.extend(xts_build_test_command)
xts_build_command.append("--ccache")
build_result = self._execute_build_xts_command(para, xts_build_command)
return build_result
def build_deps_files(self, productform):
command = ["--ccache", "--gn-args", "pycache_enable=true", "--gn-args",
"check_deps=true", "--build-only-gn"]
if productform == "rk3568":
pass
else:
command.append("--abi-type")
command.append("generic_generic_arm_64only")
command.append("--device-type")
command.append(get_output_path().split("/")[-1])
command.append("--build-variant")
command.append("root")
return self._execute_build_deps_files_command(productform, command)
# 部件间依赖关系预处理生成part_deps_info.json
def build_part_deps(self, para):
build_part_deps_result = self._execute_build_part_deps_command(para)
return build_part_deps_result
def build_gn_file(self, productform):
command = []
if self.is_build_example:
command.append("--gn-args")
command.append("build_example=true")
command.append("--build-only-gn")
command.append("--gn-args")
command.append(BUILD_TARGET_PLATFORM % productform)
return self._execute_build_command(productform, command)
def build_version(self, productform):
command = []
command.append("--build-target")
command.append("make_all")
command.append("--gn-args")
command.append(BUILD_TARGET_PLATFORM % productform)
return self._execute_build_command(productform, command)
def _delete_testcase_dir(self, productform):
if is_open_source_product(productform):
package_out_dir = os.path.join(
@ -299,108 +402,5 @@ class BuildTestcases(object):
os.chdir(current_path)
return build_result
def build_fuzz_testcases(self, para):
self._delete_testcase_dir(para.productform)
helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py")
command = [sys.executable, helper_path, 'make',
'make_temp_test', para.productform]
if subprocess.call(command, shell=False) == 0:
build_result = True
else:
build_result = False
self._merge_testcase_dir(para.productform)
return build_result
# 编译测试用例(编译命令拼接)
def build_testcases(self, productform, target):
command = []
if self.is_build_example:
command.append("--gn-args")
command.append("build_example=true")
if isinstance(target, list):
for test in target:
command.append("--build-target")
command.append(test + "_test")
elif isinstance(target, str):
for test in target.split(','):
command.append("--build-target")
command.append(test)
if productform == "rk3568":
pass
else:
command.append("--abi-type")
command.append("generic_generic_arm_64only")
command.append("--device-type")
command.append(get_output_path().split("/")[-1])
command.append("--build-variant")
command.append("root")
command.append("--ccache")
self._delete_testcase_dir(productform)
build_result = self._execute_build_command(productform, command)
self._merge_testcase_dir(productform)
return build_result
# 编译XTS测试用例
def build_xts_testcases(self, para):
self._delete_xts_testcase_dir(para)
xts_build_command = []
if para.productform == "rk3568":
False
else:
xts_build_test_command = ["--abi-type", "generic_generic_arm_64only", "--device-type",
get_output_path().split("/")[-1], "--build-variant", "root", "--gn-args",
"build_xts=true", "--export-para", "xts_suitename:" + para.testtype[0]]
if len(para.subsystem) > 0:
input_subsystem = ",".join(para.subsystem)
xts_build_test_command.append("--build-target")
xts_build_test_command.append(input_subsystem)
return False
if para.testsuit != "" and len(para.subsystem) == 0:
LOG.error("Please specify subsystem.")
return False
xts_build_command.extend(xts_build_test_command)
xts_build_command.append("--ccache")
build_result = self._execute_build_xts_command(para, xts_build_command)
return build_result
def build_deps_files(self, productform):
command = ["--ccache", "--gn-args", "pycache_enable=true", "--gn-args",
"check_deps=true", "--build-only-gn"]
if productform == "rk3568":
pass
else:
command.append("--abi-type")
command.append("generic_generic_arm_64only")
command.append("--device-type")
command.append(get_output_path().split("/")[-1])
command.append("--build-variant")
command.append("root")
return self._execute_build_deps_files_command(productform, command)
# 部件间依赖关系预处理生成part_deps_info.json
def build_part_deps(self, para):
build_part_deps_result = self._execute_build_part_deps_command(para)
return build_part_deps_result
def build_gn_file(self, productform):
command = []
if self.is_build_example:
command.append("--gn-args")
command.append("build_example=true")
command.append("--build-only-gn")
command.append("--gn-args")
command.append(BUILD_TARGET_PLATFORM % productform)
return self._execute_build_command(productform, command)
def build_version(self, productform):
command = []
command.append("--build-target")
command.append("make_all")
command.append("--gn-args")
command.append(BUILD_TARGET_PLATFORM % productform)
return self._execute_build_command(productform, command)
##############################################################################
##############################################################################

View File

@ -82,6 +82,54 @@ class SelectTargets(object):
part_path_dic[part_name] = part_path_list
return part_path_dic
def get_build_targets(self, productform, typelist, partlist, testmodule):
target_list = []
if productform == "" or len(typelist) == 0:
LOG.warning("Error: productform or typelist is empty.")
return []
if len(partlist) == 0 and testmodule != "":
LOG.warning(
"The part cannot be empty When the module is not empty.")
return []
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist和testmodule为空通过testtype获取部件列表
if len(partlist) == 0 and testmodule == "":
target_list = self._get_target_list_by_type(productform, typelist)
return target_list
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist不为空testmodule为空通过testtype、partlist一起获取部件列表
if len(partlist) != 0 and testmodule == "":
target_list = self._get_target_list_by_part(productform, typelist,
partlist)
return target_list
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist不为空testmodule不为空通过testtype、partlist、testmodule一起获取部件列表
if len(partlist) != 0 and testmodule != "":
target_list = self._get_target_list_by_module(productform,
typelist,
partlist,
testmodule)
return target_list
# 通过infos_for_testfwk.json文件获取所有子部件信息编译目录信息
# [{“部件名1”[~/OpenHarmony/out/rk3568/module_list_files/部件名1]}]
# 然后遍历这些目录中的mlf文件获取其中定义的label返回label集合
# 遍历时通过testmodule控制遍历的部件指定模块目录如果不定义则遍历子部件下面所有模块目录
# 遍历时通过partlist控制遍历指定部件目录如果不定义则遍历infos_for_testfwk.json文件中定义的所有子部件目录
def filter_build_targets(self, para):
productform = para.productform
typelist = para.testtype
partlist = para.partname_list
testmodule = para.testmodule
print("partlist = %s" % str(partlist))
target_list = self.get_build_targets(productform, typelist,
partlist, testmodule)
return target_list
def _get_target_list_from_path(self, typelist, check_path):
target_list = []
if os.path.exists(check_path):
@ -146,54 +194,5 @@ class SelectTargets(object):
target_list.extend(temp_list)
return target_list
def get_build_targets(self, productform, typelist, partlist, testmodule):
target_list = []
if productform == "" or len(typelist) == 0:
LOG.warning("Error: productform or typelist is empty.")
return []
if len(partlist) == 0 and testmodule != "":
LOG.warning(
"The part cannot be empty When the module is not empty.")
return []
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist和testmodule为空通过testtype获取部件列表
if len(partlist) == 0 and testmodule == "":
target_list = self._get_target_list_by_type(productform, typelist)
return target_list
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist不为空testmodule为空通过testtype、partlist一起获取部件列表
if len(partlist) != 0 and testmodule == "":
target_list = self._get_target_list_by_part(productform, typelist,
partlist)
return target_list
# productform不为空typelisttest type[UT,MST,ST,PERF,ALL])不为空
# partlist不为空testmodule不为空通过testtype、partlist、testmodule一起获取部件列表
if len(partlist) != 0 and testmodule != "":
target_list = self._get_target_list_by_module(productform,
typelist,
partlist,
testmodule)
return target_list
# 通过infos_for_testfwk.json文件获取所有子部件信息编译目录信息
# [{“部件名1”[~/OpenHarmony/out/rk3568/module_list_files/部件名1]}]
# 然后遍历这些目录中的mlf文件获取其中定义的label返回label集合
# 遍历时通过testmodule控制遍历的部件指定模块目录如果不定义则遍历子部件下面所有模块目录
# 遍历时通过partlist控制遍历指定部件目录如果不定义则遍历infos_for_testfwk.json文件中定义的所有子部件目录
def filter_build_targets(self, para):
productform = para.productform
typelist = para.testtype
partlist = para.partname_list
testmodule = para.testmodule
print("partlist = %s" % str(partlist))
target_list = self.get_build_targets(productform, typelist,
partlist, testmodule)
return target_list
##############################################################################
##############################################################################

View File

@ -65,47 +65,6 @@ class Console(object):
def __init__(self):
pass
def handler_ctrl_c(self, signalnum, frame):
pass
def handler_ctrl_z(self, signalnum, frame):
pass
def console(self, args):
"""
Main xDevice console providing user with the interface to interact
"""
EnvironmentManager()
if args is None or len(args) < 2:
self.wizard_dic = show_wizard_mode()
print(self.wizard_dic)
if self._build_version(self.wizard_dic["productform"]):
self._console()
else:
LOG.error("Build version failed, exit test framework.")
else:
self.command_parser(" ".join(args[1:]))
# 命令执行总入口
def _console(self):
if platform.system() != 'Windows':
signal.signal(signal.SIGTSTP, self.handler_ctrl_z) # ctrl+x linux
signal.signal(signal.SIGINT, self.handler_ctrl_c) # ctrl+c
while True:
try:
# 获取用户命令输入
usr_input = input(">>> ")
if usr_input == "":
continue
# 用户输入命令解析
self.command_parser(usr_input)
except SystemExit:
LOG.info("Program exit normally!")
return
except (IOError, EOFError, KeyboardInterrupt) as error:
LOG.exception("Input Error: %s" % error)
@staticmethod
def _parse_combination_param(combination_value):
@ -323,49 +282,6 @@ class Console(object):
return options, unparsed, valid_param
def command_parser(self, args):
try:
# 将用户输入的指令按空格拆分成字符串数组
para_list = args.split()
options, _, valid_param = self.argument_parser(para_list)
if options is None or not valid_param:
LOG.warning("options is None.")
return
# 根据命令行的命令选择不同的方法执行
command = options.action
if command == "":
LOG.warning("action is empty.")
return
if "productform" in self.wizard_dic.keys():
productform = self.wizard_dic["productform"]
options.productform = productform
else:
productform = options.productform
if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP):
self._process_command_help(para_list)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW):
self._process_command_show(para_list, productform)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN):
self._process_command_gen(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN):
# 保存原始控制命令
options.current_raw_cmd = args
self._process_command_run(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT):
self._process_command_quit(command)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_LIST):
self._process_command_device(command)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_VERSION):
self._process_command_version(command)
else:
print("The %s command is not supported." % command)
except (AttributeError, IOError, IndexError, ImportError, NameError,
RuntimeError, SystemError, TypeError, ValueError) as exception:
LOG.exception(exception, exc_info=False)
@classmethod
def _params_pre_processing(cls, para_list):
if len(para_list) <= 1 or (
@ -477,6 +393,90 @@ class Console(object):
product_form)
return build_result
def handler_ctrl_c(self, signalnum, frame):
pass
def handler_ctrl_z(self, signalnum, frame):
pass
def console(self, args):
"""
Main xDevice console providing user with the interface to interact
"""
EnvironmentManager()
if args is None or len(args) < 2:
self.wizard_dic = show_wizard_mode()
print(self.wizard_dic)
if self._build_version(self.wizard_dic["productform"]):
self._console()
else:
LOG.error("Build version failed, exit test framework.")
else:
self.command_parser(" ".join(args[1:]))
def command_parser(self, args):
try:
# 将用户输入的指令按空格拆分成字符串数组
para_list = args.split()
options, _, valid_param = self.argument_parser(para_list)
if options is None or not valid_param:
LOG.warning("options is None.")
return
# 根据命令行的命令选择不同的方法执行
command = options.action
if command == "":
LOG.warning("action is empty.")
return
if "productform" in self.wizard_dic.keys():
productform = self.wizard_dic["productform"]
options.productform = productform
else:
productform = options.productform
if command.startswith(ToolCommandType.TOOLCMD_KEY_HELP):
self._process_command_help(para_list)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_SHOW):
self._process_command_show(para_list, productform)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_GEN):
self._process_command_gen(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_RUN):
# 保存原始控制命令
options.current_raw_cmd = args
self._process_command_run(command, options)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_QUIT):
self._process_command_quit(command)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_LIST):
self._process_command_device(command)
elif command.startswith(ToolCommandType.TOOLCMD_KEY_VERSION):
self._process_command_version(command)
else:
print("The %s command is not supported." % command)
except (AttributeError, IOError, IndexError, ImportError, NameError,
RuntimeError, SystemError, TypeError, ValueError) as exception:
LOG.exception(exception, exc_info=False)
# 命令执行总入口
def _console(self):
if platform.system() != 'Windows':
signal.signal(signal.SIGTSTP, self.handler_ctrl_z) # ctrl+x linux
signal.signal(signal.SIGINT, self.handler_ctrl_c) # ctrl+c
while True:
try:
# 获取用户命令输入
usr_input = input(">>> ")
if usr_input == "":
continue
# 用户输入命令解析
self.command_parser(usr_input)
except SystemExit:
LOG.info("Program exit normally!")
return
except (IOError, EOFError, KeyboardInterrupt) as error:
LOG.exception("Input Error: %s" % error)
@dataclass
class ConfigConst(object):

View File

@ -201,7 +201,6 @@ def select_user_input(data_list):
return select_item_value, select_item_index
# 选择productform
def select_productform():
select_value = "phone"

View File

@ -28,7 +28,22 @@ MODES = stat.S_IWUSR | stat.S_IRUSR
LOG = platform_logger("Gen")
class Gen(object):
@classmethod
def fuzz_dir_generation(cls, options):
helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py")
fuzz_path = os.path.join(sys.source_code_root_path, options.dirpath)
LOG.info("fuzz_path = %s" % fuzz_path)
if not os.path.exists(fuzz_path):
os.makedirs(fuzz_path)
LOG.info("make folder %s" % fuzz_path)
command = [sys.executable, helper_path, 'generate',
options.fuzzername, fuzz_path]
LOG.info("command %s" % command)
subprocess.call(command, shell=False)
def process_command_gen(self, options):
if (len(options.testtype) != 1) or (options.dirpath == "") or \
(options.fuzzername == ""):
@ -52,17 +67,3 @@ class Gen(object):
for target in fuzzer_list:
if target:
gn_file.write("\"%s\",\n" % target)
@classmethod
def fuzz_dir_generation(cls, options):
helper_path = os.path.join("..", "libs", "fuzzlib", "fuzzer_helper.py")
fuzz_path = os.path.join(sys.source_code_root_path, options.dirpath)
LOG.info("fuzz_path = %s" % fuzz_path)
if not os.path.exists(fuzz_path):
os.makedirs(fuzz_path)
LOG.info("make folder %s" % fuzz_path)
command = [sys.executable, helper_path, 'generate',
options.fuzzername, fuzz_path]
LOG.info("command %s" % command)
subprocess.call(command, shell=False)

View File

@ -54,279 +54,6 @@ class Run(object):
def get_history(self):
return self.history_cmd_list
def process_command_run(self, command, options):
current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
if options.coverage and platform.system() != "Windows":
if not options.pullgcda:
push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py")
if os.path.exists(push_cov_path):
if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
LOG.info("No subsystem or part input. Not push coverage so.")
elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
else:
if str(options.testpart) != "[]":
param = str(options.testpart)
subprocess.run("python3 {} {} {}".format(
push_cov_path, "testpart", param), shell=True)
else:
param = str(options.subsystem)
subprocess.run("python3 {} {} {}".format(
push_cov_path, "subsystem", param), shell=True)
else:
print(f"{push_cov_path} not exists.")
init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py")
if os.path.exists(init_gcov_path):
subprocess.run("python3 %s command_str=%s" % (
init_gcov_path, current_raw_cmd), shell=True)
else:
print(f"{init_gcov_path} not exists.")
para = Parameter()
test_type_list = para.get_testtype_list(options.testtype)
if len(test_type_list) == 0:
LOG.error("The testtype parameter is incorrect.")
return
options.testtype = test_type_list
parser = ParsePartsConfig(options.productform)
partname_list = parser.get_part_list(
options.subsystem,
options.testpart)
options.partname_list = partname_list
options.coverage_outpath = self.get_coverage_outpath(options)
LOG.info("")
LOG.info("------------------------------------")
LOG.info("Input parameter:")
LOG.info("productform = %s" % options.productform)
LOG.info("testtype = %s" % str(options.testtype))
LOG.info("subsystem = %s" % str(options.subsystem))
LOG.info("testpart = %s" % str(options.testpart))
LOG.info("testmodule = %s" % options.testmodule)
LOG.info("testsuit = %s" % options.testsuit)
LOG.info("testcase = %s" % options.testcase)
LOG.info("testlevel = %s" % options.testlevel)
LOG.info("testargs = %s" % options.testargs)
LOG.info("repeat = %s" % options.repeat)
LOG.info("retry = %s" % options.retry)
LOG.info("historylist = %s" % options.historylist)
LOG.info("runhistory = %s" % options.runhistory)
LOG.info("partname_list = %s" % str(options.partname_list))
LOG.info("partdeps = %s" % options.partdeps)
LOG.info("------------------------------------")
LOG.info("")
if not para.check_run_parameter(options):
LOG.error("Input parameter is incorrect.")
return
current_time = datetime.datetime.now()
#记录命令运行历史
need_record_history = False
cmd_record = {
"time" : str(current_time),
"raw_cmd" : options.current_raw_cmd,
"result" : "unknown",
"command": command,
"options": options
}
if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
or "--retry" in options.current_raw_cmd):
need_record_history = True
#打印历史记录
if options.historylist:
print("The latest command history is: %d" % len(self.history_cmd_list))
for index in range(0, len(self.history_cmd_list)):
cmd_record = self.history_cmd_list[index]
print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
cmd_record["raw_cmd"], cmd_record["result"]))
return
#重新运行历史里的一条命令
if options.runhistory > 0:
#如果记录大于10则认为非法
if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
print("input history command[%d] out of range:", options.runhistory)
return
cmd_record = self.history_cmd_list[options.runhistory - 1]
print("run history command:", cmd_record["raw_cmd"])
need_record_history = False
command = cmd_record["command"]
options = cmd_record["options"]
if options.retry:
if len(self.history_cmd_list) <= 0:
LOG.info("No history command exsit")
return
history_cmd = self.history_cmd_list[-1]
command = history_cmd["command"]
options = history_cmd["options"]
from xdevice import Variables
latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
tree = ElementTree.parse(latest_report_path)
root = tree.getroot()
has_failed_case = 0
test_targets = {}
fail_list = []
for child in root:
print(child.tag, ":", child.attrib)
for grand in child:
print(grand.tag, ":", grand.attrib)
for sub_child in grand:
if sub_child.tag == 'failure':
fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
fail_list.append(fail_case)
has_failed_case += 1
break
test_targets["class"] = fail_list
setattr(options, "testargs", test_targets)
print("retry option:", options)
if has_failed_case > 0:
if not self._build_test_cases(options):
LOG.error("Build test cases failed.")
return
scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
plugin_id=SchedulerType.SCHEDULER)[0]
scheduler.exec_command(command, options)
else:
LOG.info("No testcase to retry")
return
if not self._build_test_cases(options):
LOG.error("Build test cases failed.")
return
if "partdeps" == options.partdeps:
self.get_part_deps_list(options.productform, options.testpart)
options.testcases_path = self.get_external_deps_out_path(options.productform)
LOG.info("partdeps = %s" % options.partdeps)
if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
test_dict = self.get_xts_test_dict(options)
options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
else:
test_dict = self.get_test_dict(options)
if not self._check_test_dictionary(test_dict):
LOG.error("The test file list is empty.")
return
if options.coverage and platform.system() != "Windows":
coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
if os.path.exists(coverage_path):
coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
coverage_process.communicate()
if ("distributedtest" in options.testtype and
len(options.testtype) == 1):
from core.command.distribute_utils import get_test_case
from core.command.distribute_utils \
import check_ditributetest_environment
from core.command.distribute_utils import make_device_info_file
from core.command.distribute_utils import make_reports
local_time = time.localtime()
create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
if not check_ditributetest_environment():
return
output_test = get_test_case(test_dict["CXX"])
if not output_test:
return
result_rootpath = os.path.join(sys.framework_root_dir,
"reports",
create_time)
log_path = os.path.join(result_rootpath, "log")
tmp_path = os.path.join(result_rootpath, "temp")
os.makedirs(log_path, exist_ok=True)
os.makedirs(tmp_path, exist_ok=True)
Scheduler.start_task_log(log_path)
make_device_info_file(tmp_path)
for case in output_test:
agent_target_name = case["agent_target_name"]
major_target_name = case["major_target_name"]
manager = DbinderTest(result_rootpath, case["suits_dir"])
manager.setUp()
manager.test_distribute(major_target_name, agent_target_name, options)
manager.tearDown()
make_reports(result_rootpath, start_time)
Scheduler.stop_task_logcat()
else:
options.testdict = test_dict
options.target_outpath = self.get_target_out_path(
options.productform)
scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
plugin_id=SchedulerType.SCHEDULER)[0]
if scheduler is None:
LOG.error("Can not find the scheduler plugin.")
else:
options.testcases_path = self.get_tests_out_path(options.productform)
options.resource_path = os.path.abspath(os.path.join(
sys.framework_root_dir, "..", "resource"))
if is_lite_product(options.productform,
sys.source_code_root_path):
if options.productform.find("wifiiot") != -1:
scheduler.update_test_type_in_source(".bin",
DeviceTestType.ctest_lite)
scheduler.update_ext_type_in_source("BIN",
DeviceTestType.ctest_lite)
else:
print("productform is not wifiiot")
scheduler.exec_command(command, options)
if need_record_history:
#读文件获取运行结果
from xdevice import Variables
latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
with open(latest_report_path) as report_file:
for report_line in report_file:
if "testsuites name=\"summary_report\"" in report_line:
result = report_line.replace("\n", "")
result = result.replace("<testsuites name=\"summary_report\" ", "")
result = result.replace(">", "")
cmd_record["result"] = result
break
if len(self.history_cmd_list) >= 10:
del self.history_cmd_list[0]
self.history_cmd_list.append(cmd_record)
if "fuzztest" == options.testtype[0] and options.coverage is False:
report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0]
latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus")
if os.path.exists(latest_corpus_path):
shutil.rmtree(latest_corpus_path)
shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path)
if options.coverage and platform.system() != "Windows":
pull_service_gcov_path = os.path.join(
sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py")
if os.path.exists(pull_service_gcov_path):
subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
else:
print(f"{pull_service_gcov_path} not exists.")
if not options.pullgcda:
cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py")
testpart = ",".join(list(map(str, options.partname_list)))
if os.path.exists(cov_main_file_path):
subprocess.run("python3 %s testpart=%s" % (
cov_main_file_path, testpart), shell=True)
else:
print(f"{cov_main_file_path} not exists.")
return
##############################################################
##############################################################
@classmethod
def get_target_out_path(cls, product_form):
target_out_path = UserConfigManager().get_test_cases_dir()
@ -425,6 +152,275 @@ class Run(object):
external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart)
return external_deps_path_list
def process_command_run(self, command, options):
current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
if options.coverage and platform.system() != "Windows":
if not options.pullgcda:
push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py")
if os.path.exists(push_cov_path):
if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
LOG.info("No subsystem or part input. Not push coverage so.")
elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
else:
if str(options.testpart) != "[]":
param = str(options.testpart)
subprocess.run("python3 {} {} {}".format(
push_cov_path, "testpart", param), shell=True)
else:
param = str(options.subsystem)
subprocess.run("python3 {} {} {}".format(
push_cov_path, "subsystem", param), shell=True)
else:
print(f"{push_cov_path} not exists.")
init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py")
if os.path.exists(init_gcov_path):
subprocess.run("python3 %s command_str=%s" % (
init_gcov_path, current_raw_cmd), shell=True)
else:
print(f"{init_gcov_path} not exists.")
para = Parameter()
test_type_list = para.get_testtype_list(options.testtype)
if len(test_type_list) == 0:
LOG.error("The testtype parameter is incorrect.")
return
options.testtype = test_type_list
parser = ParsePartsConfig(options.productform)
partname_list = parser.get_part_list(
options.subsystem,
options.testpart)
options.partname_list = partname_list
options.coverage_outpath = self.get_coverage_outpath(options)
LOG.info("")
LOG.info("------------------------------------")
LOG.info("Input parameter:")
LOG.info("productform = %s" % options.productform)
LOG.info("testtype = %s" % str(options.testtype))
LOG.info("subsystem = %s" % str(options.subsystem))
LOG.info("testpart = %s" % str(options.testpart))
LOG.info("testmodule = %s" % options.testmodule)
LOG.info("testsuit = %s" % options.testsuit)
LOG.info("testcase = %s" % options.testcase)
LOG.info("testlevel = %s" % options.testlevel)
LOG.info("testargs = %s" % options.testargs)
LOG.info("repeat = %s" % options.repeat)
LOG.info("retry = %s" % options.retry)
LOG.info("historylist = %s" % options.historylist)
LOG.info("runhistory = %s" % options.runhistory)
LOG.info("partname_list = %s" % str(options.partname_list))
LOG.info("partdeps = %s" % options.partdeps)
LOG.info("------------------------------------")
LOG.info("")
if not para.check_run_parameter(options):
LOG.error("Input parameter is incorrect.")
return
current_time = datetime.datetime.now()
#记录命令运行历史
need_record_history = False
cmd_record = {
"time" : str(current_time),
"raw_cmd" : options.current_raw_cmd,
"result" : "unknown",
"command": command,
"options": options
}
if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
or "--retry" in options.current_raw_cmd):
need_record_history = True
#打印历史记录
if options.historylist:
print("The latest command history is: %d" % len(self.history_cmd_list))
for index, cmd_record in enumerate(self.history_cmd_list):
print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
cmd_record["raw_cmd"], cmd_record["result"]))
return
#重新运行历史里的一条命令
if options.runhistory > 0:
#如果记录大于10则认为非法
if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
print("input history command[%d] out of range:", options.runhistory)
return
cmd_record = self.history_cmd_list[options.runhistory - 1]
print("run history command:", cmd_record["raw_cmd"])
need_record_history = False
command = cmd_record["command"]
options = cmd_record["options"]
if options.retry:
if len(self.history_cmd_list) <= 0:
LOG.info("No history command exsit")
return
history_cmd = self.history_cmd_list[-1]
command = history_cmd["command"]
options = history_cmd["options"]
from xdevice import Variables
latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
tree = ElementTree.parse(latest_report_path)
root = tree.getroot()
has_failed_case = 0
test_targets = {}
fail_list = []
for child in root:
print(child.tag, ":", child.attrib)
for grand in child:
print(grand.tag, ":", grand.attrib)
for sub_child in grand:
if sub_child.tag == 'failure':
fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
fail_list.append(fail_case)
has_failed_case += 1
break
test_targets["class"] = fail_list
setattr(options, "testargs", test_targets)
print("retry option:", options)
if has_failed_case > 0:
if not self._build_test_cases(options):
LOG.error("Build test cases failed.")
return
scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
plugin_id=SchedulerType.SCHEDULER)[0]
scheduler.exec_command(command, options)
else:
LOG.info("No testcase to retry")
return
if not self._build_test_cases(options):
LOG.error("Build test cases failed.")
return
if "partdeps" == options.partdeps:
self.get_part_deps_list(options.productform, options.testpart)
options.testcases_path = self.get_external_deps_out_path(options.productform)
LOG.info("partdeps = %s" % options.partdeps)
if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
test_dict = self.get_xts_test_dict(options)
options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
else:
test_dict = self.get_test_dict(options)
if not self._check_test_dictionary(test_dict):
LOG.error("The test file list is empty.")
return
if options.coverage and platform.system() != "Windows":
coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
if os.path.exists(coverage_path):
coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
coverage_process.communicate()
if ("distributedtest" in options.testtype and
len(options.testtype) == 1):
from core.command.distribute_utils import get_test_case
from core.command.distribute_utils \
import check_ditributetest_environment
from core.command.distribute_utils import make_device_info_file
from core.command.distribute_utils import make_reports
local_time = time.localtime()
create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
if not check_ditributetest_environment():
return
output_test = get_test_case(test_dict.get("CXX", None)
if not output_test:
return
result_rootpath = os.path.join(sys.framework_root_dir,
"reports",
create_time)
log_path = os.path.join(result_rootpath, "log")
tmp_path = os.path.join(result_rootpath, "temp")
os.makedirs(log_path, exist_ok=True)
os.makedirs(tmp_path, exist_ok=True)
Scheduler.start_task_log(log_path)
make_device_info_file(tmp_path)
for case in output_test:
agent_target_name = case["agent_target_name"]
major_target_name = case["major_target_name"]
manager = DbinderTest(result_rootpath, case["suits_dir"])
manager.setUp()
manager.test_distribute(major_target_name, agent_target_name, options)
manager.tearDown()
make_reports(result_rootpath, start_time)
Scheduler.stop_task_logcat()
else:
options.testdict = test_dict
options.target_outpath = self.get_target_out_path(
options.productform)
scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
plugin_id=SchedulerType.SCHEDULER)[0]
if scheduler is None:
LOG.error("Can not find the scheduler plugin.")
else:
options.testcases_path = self.get_tests_out_path(options.productform)
options.resource_path = os.path.abspath(os.path.join(
sys.framework_root_dir, "..", "resource"))
if is_lite_product(options.productform,
sys.source_code_root_path):
if options.productform.find("wifiiot") != -1:
scheduler.update_test_type_in_source(".bin",
DeviceTestType.ctest_lite)
scheduler.update_ext_type_in_source("BIN",
DeviceTestType.ctest_lite)
else:
print("productform is not wifiiot")
scheduler.exec_command(command, options)
if need_record_history:
#读文件获取运行结果
from xdevice import Variables
latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
with open(latest_report_path) as report_file:
for report_line in report_file:
if "testsuites name=\"summary_report\"" in report_line:
result = report_line.replace("\n", "")
result = result.replace("<testsuites name=\"summary_report\" ", "")
result = result.replace(">", "")
cmd_record["result"] = result
break
if len(self.history_cmd_list) >= 10:
del self.history_cmd_list[0]
self.history_cmd_list.append(cmd_record)
if "fuzztest" == options.testtype[0] and options.coverage is False:
report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0]
latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus")
if os.path.exists(latest_corpus_path):
shutil.rmtree(latest_corpus_path)
shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path)
if options.coverage and platform.system() != "Windows":
pull_service_gcov_path = os.path.join(
sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py")
if os.path.exists(pull_service_gcov_path):
subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
else:
print(f"{pull_service_gcov_path} not exists.")
if not options.pullgcda:
cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py")
testpart = ",".join(list(map(str, options.partname_list)))
if os.path.exists(cov_main_file_path):
subprocess.run("python3 %s testpart=%s" % (
cov_main_file_path, testpart), shell=True)
else:
print(f"{cov_main_file_path} not exists.")
return
def get_xts_test_dict(self, options):
# 获取XTS测试用例编译结果路径
xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype)
@ -443,5 +439,3 @@ class Run(object):
test_dict = TestCaseManager().get_test_files(test_case_path, options)
return test_dict

View File

@ -175,6 +175,17 @@ class UserConfigManager(object):
self.filepath = os.path.abspath(
os.path.join(CONFIG_PATH, config_file))
@classmethod
def content_strip(cls, content):
return content.strip()
@classmethod
def _verify_duplicate(cls, items):
if len(set(items)) != len(items):
LOG.warning("find duplicate sn config, configuration incorrect")
return False
return True
def get_user_config_list(self, tag_name):
data_dic = {}
try:
@ -189,25 +200,6 @@ class UserConfigManager(object):
LOG.error(("Parse %s fail!" % self.filepath) + xml_exception.args)
return data_dic
@classmethod
def content_strip(cls, content):
return content.strip()
@classmethod
def _verify_duplicate(cls, items):
if len(set(items)) != len(items):
LOG.warning("find duplicate sn config, configuration incorrect")
return False
return True
def _handle_str(self, content):
config_list = map(self.content_strip, content.split(';'))
config_list = [item for item in config_list if item]
if config_list:
if not self._verify_duplicate(config_list):
return []
return config_list
def get_sn_list(self):
sn_select_list = []
try:
@ -280,6 +272,14 @@ class UserConfigManager(object):
if testcase_path != "":
testcase_path = os.path.abspath(testcase_path)
return testcase_path
def _handle_str(self, content):
config_list = map(self.content_strip, content.split(';'))
config_list = [item for item in config_list if item]
if config_list:
if not self._verify_duplicate(config_list):
return []
return config_list
class BuildConfigManager(object):

View File

@ -33,63 +33,6 @@ class ResourceManager(object):
def __init__(self):
pass
def get_resource_data_dic(self, testsuit_filepath):
resource_dir = ""
data_dic = {}
target_name, _ = self._get_file_name_extension(testsuit_filepath)
xml_filepath = self.get_resource_xml_file_path(testsuit_filepath)
if not os.path.exists(xml_filepath):
return data_dic, resource_dir
data_dic = self.get_resource_data(xml_filepath, target_name)
resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
return data_dic, resource_dir
def get_resource_data(self, xml_filepath, target_name):
data_dic = {}
if os.path.exists(xml_filepath):
data_dic = self._parse_resource_test_xml_file(
xml_filepath, target_name)
return data_dic
def _parse_resource_test_xml_file(self, filepath, targetname):
data_dic = {}
node = self.find_node_by_target(filepath, targetname)
if node:
target_attrib_list = []
target_attrib_list.append(node.attrib)
environment_data_list = []
env_node = node.find("environment")
if env_node:
environment_data_list.append(env_node.attrib)
for element in env_node.findall("device"):
environment_data_list.append(element.attrib)
for option_element in element.findall("option"):
environment_data_list.append(option_element.attrib)
preparer_data_list = []
pre_node = node.find("preparer")
if pre_node:
preparer_data_list.append(pre_node.attrib)
for element in pre_node.findall("option"):
preparer_data_list.append(element.attrib)
cleaner_data_list = []
clr_node = node.find("cleaner")
if clr_node:
cleaner_data_list.append(clr_node.attrib)
for element in clr_node.findall("option"):
cleaner_data_list.append(element.attrib)
data_dic["nodeattrib"] = target_attrib_list
data_dic["environment"] = environment_data_list
data_dic["preparer"] = preparer_data_list
data_dic["cleaner"] = cleaner_data_list
return data_dic
@staticmethod
def find_node_by_target(file_path, targe_tname):
node = None
@ -109,9 +52,6 @@ class ResourceManager(object):
xml_exception.args)
return node
##########################################################################
##########################################################################
@classmethod
def _get_file_name_extension(cls, filepath):
_, fullname = os.path.split(filepath)
@ -127,64 +67,7 @@ class ResourceManager(object):
if len(dir_name_list) > 1:
dir_name = dir_name_list[-1]
return dir_name
def process_resource_file(self, resource_dir, preparer_list, device):
for item in preparer_list:
if "name" not in item.keys():
continue
if item["name"] == "push":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
src = src.replace("/", os.sep)
dir_name = self.get_dir_name(src)
if dir_name != "":
dst = dst.rstrip("/") + "/" + dir_name
device.execute_shell_command("mkdir -p %s" % dst)
device.push_file(src, dst)
elif item["name"] == "pull":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
device.pull_file(src, dst)
elif item["name"] == "shell":
command = item["value"].strip()
device.execute_shell_command(command)
else:
command = item["name"] + " " + item["value"]
command = command.strip()
device.connector_command(command)
def lite_process_resource_file(self, resource_dir, preparer_list):
for item in preparer_list:
if "name" not in item.keys():
continue
if item["name"] == "push":
copy_value = item["value"]
find_key = "->"
pos = copy_value.find(find_key)
src = os.path.join(resource_dir, copy_value[0:pos].strip())
dst = copy_value[pos + len(find_key):len(copy_value)].strip()
shutil.copy(src, dst)
elif item["name"] == "pull":
copy_value = item["value"]
find_key = "->"
pos = copy_value.find(find_key)
src = os.path.join(resource_dir, copy_value[0:pos].strip())
dst = copy_value[pos + len(find_key):len(copy_value)].strip()
shutil.copyfile(dst, src)
else:
command = item["name"] + " " + item["value"]
command = command.strip()
self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test)
@classmethod
def get_env_data(cls, environment_list):
env_data_dic = {}
@ -254,6 +137,124 @@ class ResourceManager(object):
curr_timeout = node_item_dic["timeout"]
return curr_timeout
def get_resource_data_dic(self, testsuit_filepath):
resource_dir = ""
data_dic = {}
target_name, _ = self._get_file_name_extension(testsuit_filepath)
xml_filepath = self.get_resource_xml_file_path(testsuit_filepath)
if not os.path.exists(xml_filepath):
return data_dic, resource_dir
data_dic = self.get_resource_data(xml_filepath, target_name)
resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
return data_dic, resource_dir
def get_resource_data(self, xml_filepath, target_name):
data_dic = {}
if os.path.exists(xml_filepath):
data_dic = self._parse_resource_test_xml_file(
xml_filepath, target_name)
return data_dic
def _parse_resource_test_xml_file(self, filepath, targetname):
data_dic = {}
node = self.find_node_by_target(filepath, targetname)
if node:
target_attrib_list = []
target_attrib_list.append(node.attrib)
environment_data_list = []
env_node = node.find("environment")
if env_node:
environment_data_list.append(env_node.attrib)
for element in env_node.findall("device"):
environment_data_list.append(element.attrib)
for option_element in element.findall("option"):
environment_data_list.append(option_element.attrib)
preparer_data_list = []
pre_node = node.find("preparer")
if pre_node:
preparer_data_list.append(pre_node.attrib)
for element in pre_node.findall("option"):
preparer_data_list.append(element.attrib)
cleaner_data_list = []
clr_node = node.find("cleaner")
if clr_node:
cleaner_data_list.append(clr_node.attrib)
for element in clr_node.findall("option"):
cleaner_data_list.append(element.attrib)
data_dic["nodeattrib"] = target_attrib_list
data_dic["environment"] = environment_data_list
data_dic["preparer"] = preparer_data_list
data_dic["cleaner"] = cleaner_data_list
return data_dic
##########################################################################
##########################################################################
def process_resource_file(self, resource_dir, preparer_list, device):
for item in preparer_list:
if "name" not in item.keys():
continue
if item["name"] == "push":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
src = src.replace("/", os.sep)
dir_name = self.get_dir_name(src)
if dir_name != "":
dst = dst.rstrip("/") + "/" + dir_name
device.execute_shell_command("mkdir -p %s" % dst)
device.push_file(src, dst)
elif item["name"] == "pull":
push_value = item["value"]
find_key = "->"
pos = push_value.find(find_key)
src = os.path.join(resource_dir, push_value[0:pos].strip())
dst = push_value[pos + len(find_key):len(push_value)].strip()
device.pull_file(src, dst)
elif item["name"] == "shell":
command = item["value"].strip()
device.execute_shell_command(command)
else:
command = item["name"] + " " + item["value"]
command = command.strip()
device.connector_command(command)
def lite_process_resource_file(self, resource_dir, preparer_list):
for item in preparer_list:
if "name" not in item.keys():
continue
if item["name"] == "push":
copy_value = item["value"]
find_key = "->"
pos = copy_value.find(find_key)
src = os.path.join(resource_dir, copy_value[0:pos].strip())
dst = copy_value[pos + len(find_key):len(copy_value)].strip()
shutil.copy(src, dst)
elif item["name"] == "pull":
copy_value = item["value"]
find_key = "->"
pos = copy_value.find(find_key)
src = os.path.join(resource_dir, copy_value[0:pos].strip())
dst = copy_value[pos + len(find_key):len(copy_value)].strip()
shutil.copyfile(dst, src)
else:
command = item["name"] + " " + item["value"]
command = command.strip()
self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test)
def get_environment_data(self, data_dic):
env_data_dic = {}
if "environment" in data_dic.keys():

View File

@ -539,6 +539,31 @@ class CppTestDriver(IDriver):
config = None
result = ""
@staticmethod
def _alter_init(name):
with open(name, "rb") as f:
lines = f.read()
str_content = lines.decode("utf-8")
pattern_sharp = '^\s*#.*$(\n|\r\n)'
pattern_star = '/\*.*?\*/(\n|\r\n)+'
pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
if re.match(pattern_sharp, str_content, flags=re.M):
striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
elif re.findall(pattern_star, str_content, flags=re.S):
striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
elif re.findall(pattern_xml, str_content, flags=re.S):
striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
else:
striped_content = str_content
striped_bt = striped_content.encode("utf-8")
if os.path.exists(name):
os.remove(name)
with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
f.write(striped_bt)
def __check_environment__(self, device_options):
if len(device_options) == 1 and device_options[0].label is None:
return True
@ -714,31 +739,6 @@ class CppTestDriver(IDriver):
resource_dir,
self.config.device)
@staticmethod
def _alter_init(name):
with open(name, "rb") as f:
lines = f.read()
str_content = lines.decode("utf-8")
pattern_sharp = '^\s*#.*$(\n|\r\n)'
pattern_star = '/\*.*?\*/(\n|\r\n)+'
pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
if re.match(pattern_sharp, str_content, flags=re.M):
striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
elif re.findall(pattern_star, str_content, flags=re.S):
striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
elif re.findall(pattern_xml, str_content, flags=re.S):
striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
else:
striped_content = str_content
striped_bt = striped_content.encode("utf-8")
if os.path.exists(name):
os.remove(name)
with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
f.write(striped_bt)
def _push_corpus_cov_if_exist(self, suite_file):
corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep)
cov_file = os.path.join(
@ -838,6 +838,124 @@ class JSUnitTestDriver(IDriver):
self.hilog = None
self.hilog_proc = None
@staticmethod
def _get_acts_test_para(testcase,
testlevel,
testtype,
target_test_path,
suite_file,
filename):
if "actstest" == testtype[0]:
test_para = (" --actstest_out_format=json"
" --actstest_out=%s%s.json") % (
target_test_path, filename)
return test_para
if "" != testcase and "" == testlevel:
test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
elif "" == testcase and "" != testlevel:
level_para = get_level_para_string(testlevel)
test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
else:
test_para = ""
return test_para
@staticmethod
def _get_hats_test_para(testcase,
testlevel,
testtype,
target_test_path,
suite_file,
filename):
if "hatstest" == testtype[0]:
test_hats_para = (" --hatstest_out_format=json"
" --hatstest_out=%s%s.json") % (
target_test_path, filename)
return test_hats_para
if "" != testcase and "" == testlevel:
test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
elif "" == testcase and "" != testlevel:
level_para = get_level_para_string(testlevel)
test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
else:
test_hats_para = ""
return test_hats_para
@classmethod
def _get_json_shell_timeout(cls, json_filepath):
test_timeout = 0
try:
with open(json_filepath, 'r') as json_file:
data_dic = json.load(json_file)
if not data_dic:
return test_timeout
else:
if "driver" in data_dic.keys():
driver_dict = data_dic.get("driver")
if driver_dict and "test-timeout" in driver_dict.keys():
test_timeout = int(driver_dict["shell-timeout"]) / 1000
return test_timeout
except JSONDecodeError:
return test_timeout
finally:
print(" get json shell timeout finally")
@staticmethod
def _get_package_and_ability_name(hap_filepath):
package_name = ""
ability_name = ""
if os.path.exists(hap_filepath):
filename = os.path.basename(hap_filepath)
# unzip the hap file
hap_bak_path = os.path.abspath(os.path.join(
os.path.dirname(hap_filepath),
"%s.bak" % filename))
zf_desc = zipfile.ZipFile(hap_filepath)
try:
zf_desc.extractall(path=hap_bak_path)
except RuntimeError as error:
print("Unzip error: ", hap_bak_path)
zf_desc.close()
# verify config.json file
app_profile_path = os.path.join(hap_bak_path, "config.json")
if not os.path.exists(app_profile_path):
print("file %s not exist" % app_profile_path)
return package_name, ability_name
if os.path.isdir(app_profile_path):
print("%s is a folder, and not a file" % app_profile_path)
return package_name, ability_name
# get package_name and ability_name value
load_dict = {}
with open(app_profile_path, 'r') as load_f:
load_dict = json.load(load_f)
profile_list = load_dict.values()
for profile in profile_list:
package_name = profile.get("package")
if not package_name:
continue
abilities = profile.get("abilities")
for abilitie in abilities:
abilities_name = abilitie.get("name")
if abilities_name.startswith("."):
ability_name = package_name + abilities_name[
abilities_name.find("."):]
else:
ability_name = abilities_name
break
break
# delete hap_bak_path
if os.path.exists(hap_bak_path):
shutil.rmtree(hap_bak_path)
else:
print("file %s not exist" % hap_filepath)
return package_name, ability_name
def __check_environment__(self, device_options):
pass
@ -1053,124 +1171,6 @@ class JSUnitTestDriver(IDriver):
_sleep_according_to_result(return_message)
return return_message
@staticmethod
def _get_acts_test_para(testcase,
testlevel,
testtype,
target_test_path,
suite_file,
filename):
if "actstest" == testtype[0]:
test_para = (" --actstest_out_format=json"
" --actstest_out=%s%s.json") % (
target_test_path, filename)
return test_para
if "" != testcase and "" == testlevel:
test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
elif "" == testcase and "" != testlevel:
level_para = get_level_para_string(testlevel)
test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
else:
test_para = ""
return test_para
@staticmethod
def _get_hats_test_para(testcase,
testlevel,
testtype,
target_test_path,
suite_file,
filename):
if "hatstest" == testtype[0]:
test_hats_para = (" --hatstest_out_format=json"
" --hatstest_out=%s%s.json") % (
target_test_path, filename)
return test_hats_para
if "" != testcase and "" == testlevel:
test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
elif "" == testcase and "" != testlevel:
level_para = get_level_para_string(testlevel)
test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
else:
test_hats_para = ""
return test_hats_para
@classmethod
def _get_json_shell_timeout(cls, json_filepath):
test_timeout = 0
try:
with open(json_filepath, 'r') as json_file:
data_dic = json.load(json_file)
if not data_dic:
return test_timeout
else:
if "driver" in data_dic.keys():
driver_dict = data_dic.get("driver")
if driver_dict and "test-timeout" in driver_dict.keys():
test_timeout = int(driver_dict["shell-timeout"]) / 1000
return test_timeout
except JSONDecodeError:
return test_timeout
finally:
print(" get json shell timeout finally")
@staticmethod
def _get_package_and_ability_name(hap_filepath):
package_name = ""
ability_name = ""
if os.path.exists(hap_filepath):
filename = os.path.basename(hap_filepath)
# unzip the hap file
hap_bak_path = os.path.abspath(os.path.join(
os.path.dirname(hap_filepath),
"%s.bak" % filename))
zf_desc = zipfile.ZipFile(hap_filepath)
try:
zf_desc.extractall(path=hap_bak_path)
except RuntimeError as error:
print("Unzip error: ", hap_bak_path)
zf_desc.close()
# verify config.json file
app_profile_path = os.path.join(hap_bak_path, "config.json")
if not os.path.exists(app_profile_path):
print("file %s not exist" % app_profile_path)
return package_name, ability_name
if os.path.isdir(app_profile_path):
print("%s is a folder, and not a file" % app_profile_path)
return package_name, ability_name
# get package_name and ability_name value
load_dict = {}
with open(app_profile_path, 'r') as load_f:
load_dict = json.load(load_f)
profile_list = load_dict.values()
for profile in profile_list:
package_name = profile.get("package")
if not package_name:
continue
abilities = profile.get("abilities")
for abilitie in abilities:
abilities_name = abilitie.get("name")
if abilities_name.startswith("."):
ability_name = package_name + abilities_name[
abilities_name.find("."):]
else:
ability_name = abilities_name
break
break
# delete hap_bak_path
if os.path.exists(hap_bak_path):
shutil.rmtree(hap_bak_path)
else:
print("file %s not exist" % hap_filepath)
return package_name, ability_name
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
class OHRustTestDriver(IDriver):

View File

@ -124,6 +124,23 @@ class LiteUnitTest(IDriver):
return
self.log.info("lite device execute request success")
def __result__(self):
pass
def show_help_info(self):
"""
show help info.
"""
self.log.info("this is test driver for cpp test")
return
def show_driver_info(self):
"""
show driver info.
"""
self.log.info("this is test driver for cpp test")
return
def _mount_nfs_server(self):
#before execute each suits bin, mount nfs
self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config(
@ -287,23 +304,6 @@ class LiteUnitTest(IDriver):
time.sleep(5)
return False
def show_help_info(self):
"""
show help info.
"""
self.log.info("this is test driver for cpp test")
return
def show_driver_info(self):
"""
show driver info.
"""
self.log.info("this is test driver for cpp test")
return
def __result__(self):
pass
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
class CTestDriver(IDriver):
@ -365,6 +365,9 @@ class CTestDriver(IDriver):
self.error_message,
report_name)
def __result__(self):
return self.result if os.path.exists(self.result) else ""
def _run_ctest(self, source=None, request=None):
if not source:
LOG.error("Error: %s don't exist." % source, error_no="00101")
@ -435,10 +438,6 @@ class CTestDriver(IDriver):
reset_cmd = [int(item, 16) for item in reset_cmd]
return reset_cmd
def __result__(self):
return self.result if os.path.exists(self.result) else ""
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
class JSUnitTestLiteDriver(IDriver):
@ -457,23 +456,7 @@ class JSUnitTestLiteDriver(IDriver):
def __check_config__(self, config):
pass
def _get_driver_config(self, json_config):
bundle_name = get_config_value('bundle-name',
json_config.get_driver(), False)
if not bundle_name:
raise ParamError("Can't find bundle-name in config file.",
error_no="00108")
else:
self.config.bundle_name = bundle_name
ability = get_config_value('ability',
json_config.get_driver(), False)
if not ability:
self.config.ability = "default"
else:
self.config.ability = ability
def __execute__(self, request):
try:
LOG.debug("Start execute xdevice extension JSUnit Test")
@ -536,6 +519,25 @@ class JSUnitTestLiteDriver(IDriver):
self.config.device.close()
def __result__(self):
return self.result if os.path.exists(self.result) else ""
def _get_driver_config(self, json_config):
bundle_name = get_config_value('bundle-name',
json_config.get_driver(), False)
if not bundle_name:
raise ParamError("Can't find bundle-name in config file.",
error_no="00108")
else:
self.config.bundle_name = bundle_name
ability = get_config_value('ability',
json_config.get_driver(), False)
if not ability:
self.config.ability = "default"
else:
self.config.ability = ability
def _run_jsunit(self, request):
parser_instances = []
parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
@ -564,5 +566,4 @@ class JSUnitTestLiteDriver(IDriver):
"\n".join(result.split("\n")[0:-1]), "\n"))
file_name.flush()
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -123,20 +123,11 @@ class OHJSUnitTestRunner:
self.finished_observer.notify_task_finished()
self.retry_times -= 1
def _get_shell_handler(self, listener):
parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
if parsers:
parsers = parsers[:1]
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.suites_name
parser_instance.listeners = listener
parser_instance.runner = self
parser_instances.append(parser_instance)
self.finished_observer = parser_instance
handler = ShellHandler(parser_instances)
return handler
def get_oh_test_runner_path(self):
if self.compile_mode == "esmodule":
return "/ets/testrunner/OpenHarmonyTestRunner"
else:
return "OpenHarmonyTestRunner"
def add_arg(self, name, value):
if not name or not value:
@ -157,6 +148,21 @@ class OHJSUnitTestRunner:
else:
args_commands = "%s -s %s %s " % (args_commands, key, value)
return args_commands
def _get_shell_handler(self, listener):
parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
if parsers:
parsers = parsers[:1]
parser_instances = []
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.suites_name
parser_instance.listeners = listener
parser_instance.runner = self
parser_instances.append(parser_instance)
self.finished_observer = parser_instance
handler = ShellHandler(parser_instances)
return handler
def _get_run_command(self):
command = ""
@ -191,12 +197,6 @@ class OHJSUnitTestRunner:
return command
def get_oh_test_runner_path(self):
if self.compile_mode == "esmodule":
return "/ets/testrunner/OpenHarmonyTestRunner"
else:
return "OpenHarmonyTestRunner"
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
class OHJSUnitTestDriver(IDriver):
@ -278,6 +278,9 @@ class OHJSUnitTestDriver(IDriver):
request.config.report_path, self.result, self.error_message)
update_xml(request.root.source.source_file, self.result)
def __result__(self):
return self.result if os.path.exists(self.result) else ""
def _run_oh_jsunit(self, config_file, request):
try:
if not os.path.exists(config_file):
@ -501,7 +504,3 @@ class OHJSUnitTestDriver(IDriver):
stop_catch_device_log(self.log_proc)
self.config.device.device_log_collector.\
stop_catch_device_log(self.hilog_proc)
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -47,6 +47,137 @@ FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"]
class TestCaseManager(object):
@classmethod
def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
partlist = options.partname_list
testmodule = options.testmodule
testsuit = options.testsuit
if not suite_file.startswith(test_case_out_path):
return False
if testsuit != "":
short_name, _ = os.path.splitext(os.path.basename(suite_file))
testsuit_list = testsuit.split(',')
for test in testsuit_list:
if short_name.startswith(test) or \
testsuit.startswith(short_name):
return True
return False
is_valid_status = False
suitfile_subpath = suite_file.replace(test_case_out_path, "")
suitfile_subpath = suitfile_subpath.strip(os.sep)
if len(partlist) == 0:
if testmodule != "":
temp_list = suitfile_subpath.split(os.sep)
if len(temp_list) > 2 and testmodule == temp_list[1]:
is_valid_status = True
else:
is_valid_status = True
else:
for partname in partlist:
if testmodule != "":
if suitfile_subpath.startswith(
partname + os.sep + testmodule + os.sep):
is_valid_status = True
break
else:
if suitfile_subpath.startswith(partname + os.sep):
is_valid_status = True
break
return is_valid_status
@classmethod
def check_python_test_file(cls, suite_file):
if suite_file.endswith(".py"):
filename = os.path.basename(suite_file)
if filename.startswith("test_"):
return True
return False
@classmethod
def check_hap_test_file(cls, hap_file_path):
try:
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".json")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
if not data_dic:
return False
else:
if "kits" in data_dic.keys():
kits_list = data_dic.get("kits")
if len(kits_list) > 0:
for kits_dict in kits_list:
if "test-file-name" not in kits_dict.keys():
continue
else:
return True
else:
return False
return False
except JSONDecodeError:
return False
finally:
print(" check hap test file finally")
@classmethod
def get_hap_test_driver(cls, hap_file_path):
data_dic = cls.get_hap_json(hap_file_path)
if not data_dic:
return ""
else:
if "driver" in data_dic.keys():
driver_dict = data_dic.get("driver")
if bool(driver_dict):
driver_type = driver_dict.get("type")
return driver_type
else:
LOG.error("%s has not set driver." % hap_file_path)
return ""
else:
return ""
@classmethod
def get_hap_json(cls, hap_file_path):
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".json")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
return data_dic
else:
return {}
else:
return {}
@classmethod
def get_hap_part_json(cls, hap_file_path):
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
return data_dic
else:
return {}
else:
return {}
@classmethod
def get_part_name_test_file(cls, hap_file_path):
data_dic = cls.get_hap_part_json(hap_file_path)
if not data_dic:
return ""
else:
if "part" in data_dic.keys():
part_name = data_dic["part"]
return part_name
else:
return ""
def get_test_files(self, test_case_path, options):
LOG.info("test case path: " + test_case_path)
LOG.info("test type list: " + str(options.testtype))
@ -219,134 +350,3 @@ class TestCaseManager(object):
if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest":
xts_suit_file_dic.get("JST").append(xts_suite_file)
return xts_suit_file_dic
@classmethod
def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
partlist = options.partname_list
testmodule = options.testmodule
testsuit = options.testsuit
if not suite_file.startswith(test_case_out_path):
return False
if testsuit != "":
short_name, _ = os.path.splitext(os.path.basename(suite_file))
testsuit_list = testsuit.split(',')
for test in testsuit_list:
if short_name.startswith(test) or \
testsuit.startswith(short_name):
return True
return False
is_valid_status = False
suitfile_subpath = suite_file.replace(test_case_out_path, "")
suitfile_subpath = suitfile_subpath.strip(os.sep)
if len(partlist) == 0:
if testmodule != "":
temp_list = suitfile_subpath.split(os.sep)
if len(temp_list) > 2 and testmodule == temp_list[1]:
is_valid_status = True
else:
is_valid_status = True
else:
for partname in partlist:
if testmodule != "":
if suitfile_subpath.startswith(
partname + os.sep + testmodule + os.sep):
is_valid_status = True
break
else:
if suitfile_subpath.startswith(partname + os.sep):
is_valid_status = True
break
return is_valid_status
@classmethod
def check_python_test_file(cls, suite_file):
if suite_file.endswith(".py"):
filename = os.path.basename(suite_file)
if filename.startswith("test_"):
return True
return False
@classmethod
def check_hap_test_file(cls, hap_file_path):
try:
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".json")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
if not data_dic:
return False
else:
if "kits" in data_dic.keys():
kits_list = data_dic.get("kits")
if len(kits_list) > 0:
for kits_dict in kits_list:
if "test-file-name" not in kits_dict.keys():
continue
else:
return True
else:
return False
return False
except JSONDecodeError:
return False
finally:
print(" check hap test file finally")
@classmethod
def get_hap_test_driver(cls, hap_file_path):
data_dic = cls.get_hap_json(hap_file_path)
if not data_dic:
return ""
else:
if "driver" in data_dic.keys():
driver_dict = data_dic.get("driver")
if bool(driver_dict):
driver_type = driver_dict.get("type")
return driver_type
else:
LOG.error("%s has not set driver." % hap_file_path)
return ""
else:
return ""
@classmethod
def get_hap_json(cls, hap_file_path):
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".json")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
return data_dic
else:
return {}
else:
return {}
@classmethod
def get_hap_part_json(cls, hap_file_path):
if hap_file_path.endswith(".hap"):
json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
if os.path.exists(json_file_path):
with open(json_file_path, 'r') as json_file:
data_dic = json.load(json_file)
return data_dic
else:
return {}
else:
return {}
@classmethod
def get_part_name_test_file(cls, hap_file_path):
data_dic = cls.get_hap_part_json(hap_file_path)
if not data_dic:
return ""
else:
if "part" in data_dic.keys():
part_name = data_dic["part"]
return part_name
else:
return ""

View File

@ -49,6 +49,23 @@ class DeployKit(ITestKit):
self.burn_command = ""
self.timeout = ""
self.paths = ""
def copy_file_as_temp(self, original_file, str_length):
"""
To obtain a random string with specified length
Parameters:
original_file : the original file path
str_length: the length of random string
"""
if os.path.isfile(original_file):
random_str = random.sample(string.ascii_letters + string.digits,
str_length)
new_temp_tool_path = '{}_{}{}'.format(
os.path.splitext(original_file)[0], "".join(random_str),
os.path.splitext(original_file)[1])
return shutil.copyfile(original_file, new_temp_tool_path)
else:
return ""
def __check_config__(self, config):
self.timeout = str(int(get_config_value(
@ -63,6 +80,22 @@ class DeployKit(ITestKit):
"burn_file:{}".format(self.timeout, self.burn_file)
raise ParamError(msg, error_no="00108")
def __setup__(self, device, **kwargs):
"""
Execute reset command on the device by cmd serial port and then upload
patch file by deploy tool.
Parameters:
device: the instance of LocalController with one or more
ComController
"""
args = kwargs
source_file = args.get("source_file", None)
self._reset(device)
self._send_file(device, source_file)
def __teardown__(self, device):
pass
def _reset(self, device):
cmd_com = device.device.com_dict.get(ComType.cmd_com)
try:
@ -112,36 +145,3 @@ class DeployKit(ITestKit):
device.device_allocation_state = DeviceAllocationState.unusable
raise LiteDeviceError("%s port set_up wifiiot failed" %
deploy_serial_port, error_no="00402")
def __setup__(self, device, **kwargs):
"""
Execute reset command on the device by cmd serial port and then upload
patch file by deploy tool.
Parameters:
device: the instance of LocalController with one or more
ComController
"""
args = kwargs
source_file = args.get("source_file", None)
self._reset(device)
self._send_file(device, source_file)
def __teardown__(self, device):
pass
def copy_file_as_temp(self, original_file, str_length):
"""
To obtain a random string with specified length
Parameters:
original_file : the original file path
str_length: the length of random string
"""
if os.path.isfile(original_file):
random_str = random.sample(string.ascii_letters + string.digits,
str_length)
new_temp_tool_path = '{}_{}{}'.format(
os.path.splitext(original_file)[0], "".join(random_str),
os.path.splitext(original_file)[1])
return shutil.copyfile(original_file, new_temp_tool_path)
else:
return ""