!255 JSON配置文件GBK编码时读取异常,设备重启后等待接口异常

Merge pull request !255 from liguangjie/master
This commit is contained in:
openharmony_ci 2023-11-01 11:33:09 +00:00 committed by Gitee
commit 6e433b3bfd
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
16 changed files with 1342 additions and 308 deletions

View File

@ -14,4 +14,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

View File

@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import os
@ -23,7 +22,8 @@ import threading
import platform
import subprocess
import sys
from xdevice import DeviceOsType, FilePermission
from xdevice import DeviceOsType
from xdevice import FilePermission
from xdevice import ProductForm
from xdevice import ReportException
from xdevice import IDevice
@ -35,12 +35,14 @@ from xdevice import HdcError
from xdevice import DeviceAllocationState
from xdevice import DeviceConnectorType
from xdevice import TestDeviceState
from xdevice import AdvanceDeviceOption
from xdevice import convert_serial
from xdevice import check_path_legal
from xdevice import start_standing_subprocess
from xdevice import stop_standing_subprocess
from xdevice import get_cst_time
from xdevice import Platform
from xdevice import AppInstallError
from xdevice import RpcNotRunningError
from ohos.environment.dmlib import HdcHelper
from ohos.environment.dmlib import CollectingOutputReceiver
@ -60,6 +62,7 @@ UITEST_COMMAND = "{} start-daemon 0123456789 &".format(UITEST_PATH)
NATIVE_CRASH_PATH = "/data/log/faultlog/temp"
JS_CRASH_PATH = "/data/log/faultlog/faultlogger"
ROOT_PATH = "/data/log/faultlog"
LOGLEVEL = ["DEBUG", "INFO", "WARN", "ERROR", "FATAL"]
def perform_device_action(func):
@ -151,7 +154,7 @@ class Device(IDevice):
device_os_type = DeviceOsType.default
test_device_state = None
device_allocation_state = DeviceAllocationState.available
label = None
label = ProductForm.phone
log = platform_logger("Device")
device_state_monitor = None
reboot_timeout = 2 * 60 * 1000
@ -171,14 +174,17 @@ class Device(IDevice):
oh_module_package = None
module_ablity_name = None
_device_report_path = None
test_platform = Platform.ohos
_webview = None
model_dict = {
'default': ProductForm.phone,
'phone': ProductForm.phone,
'car': ProductForm.car,
'tv': ProductForm.television,
'watch': ProductForm.watch,
'tablet': ProductForm.tablet,
'2in1': ProductForm._2in1,
'nosdcard': ProductForm.phone
}
@ -191,7 +197,7 @@ class Device(IDevice):
def __eq__(self, other):
return self.device_sn == other.__get_serial__() and \
self.device_os_type == other.device_os_type
self.device_os_type == other.device_os_type
def __set_serial__(self, device_sn=""):
self.device_sn = device_sn
@ -215,13 +221,15 @@ class Device(IDevice):
self.device_sn, ConfigConst.recover_state))
return False
result = self.device_state_monitor.wait_for_device_available()
result = self.device_state_monitor.wait_for_device_available(self.reboot_timeout)
if result:
self.device_log_collector.restart_catch_device_log()
return result
def get_device_type(self):
self.label = self.model_dict.get("default", None)
model = self.get_property("const.product.devicetype",
abort_on_exception=True)
self.label = self.model_dict.get(model, ProductForm.phone)
def get_property(self, prop_name, retry=RETRY_ATTEMPTS,
abort_on_exception=False):
@ -393,15 +401,6 @@ class Device(IDevice):
stdout = self.execute_shell_command(command, timeout=5 * 1000,
output_flag=False, retry=retry,
abort_on_exception=True).strip()
if stdout:
if "fail" in stdout:
cmd = [HdcHelper.CONNECTOR_NAME, "list", "targets"]
stdout = exec_cmd(cmd)
LOG.debug("exec cmd list targets:{},current device_sn:{}".format(stdout, self.device_sn))
if stdout and (self.device_sn in stdout):
stdout = "true"
LOG.debug(stdout)
return stdout
def set_recover_state(self, state):
@ -420,6 +419,24 @@ class Device(IDevice):
state = getattr(self, ConfigConst.recover_state, default_state)
return state
def wait_for_boot_completion(self):
"""Waits for the device to boot up.
Returns:
True if the device successfully finished booting, False otherwise.
"""
return self.device_state_monitor.wait_for_boot_complete(self.reboot_timeout)
@classmethod
def check_recover_result(cls, recover_result):
return "true" in recover_result
@property
def device_log_collector(self):
if self._device_log_collector is None:
self._device_log_collector = DeviceLogCollector(self)
return self._device_log_collector
def close(self):
self.reconnecttimes = 0
@ -433,11 +450,11 @@ class Device(IDevice):
if self.is_abc:
self.stop_harmony_rpc(kill_all=False)
else:
self.remove_ports()
self.stop_harmony_rpc(kill_all=True)
# do proxy clean
if self.proxy_listener is not None:
self.proxy_listener(is_exception=False)
if self.proxy_listener is not None:
self.proxy_listener(is_exception=False)
self.remove_ports()
self.device_log_collector.stop_restart_catch_device_log()
@property
@ -455,6 +472,10 @@ class Device(IDevice):
# check uitest
self.check_uitest_status()
self._proxy = self.get_harmony()
except AppInstallError as error:
raise error
except RpcNotRunningError as error:
raise error
except Exception as error:
self._proxy = None
self.log.error("DeviceTest-10012 proxy:%s" % str(error))
@ -470,6 +491,8 @@ class Device(IDevice):
# check uitest
self.check_uitest_status()
self._abc_proxy = self.get_harmony(start_abc=True)
except RpcNotRunningError as error:
raise error
except Exception as error:
self._abc_proxy = None
self.log.error("DeviceTest-10012 abc_proxy:%s" % str(error))
@ -558,6 +581,8 @@ class Device(IDevice):
self.log.debug(str(error))
self.log.error('please check devicetest extension module is exist.')
raise Exception(ErrorMessage.Error_01437.Topic)
except AppInstallError as error:
raise error
except Exception as error:
self.log.debug(str(error))
self.log.error('root device init RPC error.')
@ -570,11 +595,15 @@ class Device(IDevice):
cmd = "aa start -a {}.ServiceAbility -b {}".format(DEVICETEST_HAP_PACKAGE_NAME, DEVICETEST_HAP_PACKAGE_NAME)
result = self.execute_shell_command(cmd)
self.log.debug('start devicetest ability, {}'.format(result))
if "successfully" not in result:
raise RpcNotRunningError("harmony {} rpc start failed".format("system" if self.is_abc else "normal"),
error_no=ErrorMessage.Error_01439.Code)
if not self.is_abc:
self.start_uitest()
time.sleep(1)
if not self.check_rpc_status(check_abc=False):
raise Exception("harmony rpc not running")
raise RpcNotRunningError("harmony {} rpc process not found".format("system" if self.is_abc else "normal"),
error_no=ErrorMessage.Error_01440.Code)
def start_abc_rpc(self, re_install_rpc=False):
if re_install_rpc:
@ -594,11 +623,12 @@ class Device(IDevice):
return
self.start_uitest()
time.sleep(1)
from devicetest.core.error_message import ErrorMessage
if not self.check_rpc_status(check_abc=True):
raise Exception("harmony abc rpc not running")
raise RpcNotRunningError("harmony abc rpc process not found", error_no=ErrorMessage.Error_01440.Code)
def stop_harmony_rpc(self, kill_all=True):
# abc妯″紡涓嬩粎鏉€鎺塪evicetest锛屽惁鍒欓兘鏉€鎺?
# only kill devicetest in abc mode, or kill all
proc_pids = self.get_devicetest_proc_pid()
if not kill_all:
proc_pids.pop()
@ -716,8 +746,13 @@ class Device(IDevice):
@summary: Reconnect the device.
'''
if not self.wait_for_boot_completion():
self._proxy = None
self._uitestdeamon = None
raise Exception("Reconnect timed out.")
if self._proxy:
# do proxy clean
if not self.is_abc and self.proxy_listener is not None:
self.proxy_listener(is_exception=True)
self.start_harmony_rpc(re_install_rpc=True)
self._h_port = self.get_local_port(start_abc=False)
cmd = "fport tcp:{} tcp:{}".format(
@ -728,10 +763,11 @@ class Device(IDevice):
except Exception as _:
time.sleep(3)
self._proxy.init(port=self._h_port, addr=self.host, device=self)
# do proxy clean
if not self.is_abc and self.proxy_listener is not None:
self.proxy_listener(is_exception=True)
if self.is_abc and self._abc_proxy:
# do proxy clean
if self.proxy_listener is not None:
self.proxy_listener(is_exception=True)
self.start_abc_rpc(re_install_rpc=True)
self._h_port = self.get_local_port(start_abc=True)
cmd = "fport tcp:{} tcp:{}".format(
@ -742,9 +778,6 @@ class Device(IDevice):
except Exception as _:
time.sleep(3)
self._abc_proxy.init(port=self._h_port, addr=self.host, device=self)
# do proxt clean
if self.proxy_listener is not None:
self.proxy_listener(is_exception=True)
if self._uitestdeamon is not None:
self._uitestdeamon.init(self)
@ -753,14 +786,6 @@ class Device(IDevice):
return self._proxy
return None
def wait_for_boot_completion(self):
"""Waits for the device to boot up.
Returns:
True if the device successfully finished booting, False otherwise.
"""
return self.device_state_monitor.wait_for_device_available(self.reboot_timeout)
def get_local_port(self, start_abc):
from devicetest.utils.util import get_forward_port
host = self.host
@ -796,10 +821,6 @@ class Device(IDevice):
cmd = "fport rm {}".format(data[0][1:-1])
self.connector_command(cmd, is_print=False)
@classmethod
def check_recover_result(cls, recover_result):
return "true" in recover_result
def take_picture(self, name):
'''
@summary: 截取手机屏幕图片并保存
@ -825,6 +846,12 @@ class Device(IDevice):
self.log.error("devicetest take_picture: {}".format(str(error)))
return path
def set_device_report_path(self, path):
self._device_report_path = path
def get_device_report_path(self):
return self._device_report_path
def execute_shell_in_daemon(self, command):
if self.host != "127.0.0.1":
cmd = [HdcHelper.CONNECTOR_NAME, "-s", "{}:{}".format(
@ -853,29 +880,20 @@ class Device(IDevice):
self._webview = WebView(self)
return self._webview
@property
def device_log_collector(self):
if self._device_log_collector is None:
self._device_log_collector = DeviceLogCollector(self)
return self._device_log_collector
def set_device_report_path(self, path):
self._device_log_path = path
def get_device_report_path(self):
return self._device_log_path
class DeviceLogCollector:
hilog_file_address = []
log_file_address = []
device = None
restart_proc = []
device_log_level = None
is_clear = True
def __init__(self, device):
self.device = device
def restart_catch_device_log(self):
self._sync_device_time()
for _, path in enumerate(self.hilog_file_address):
hilog_open = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
FilePermission.mode_755)
@ -892,12 +910,22 @@ class DeviceLogCollector:
self.log_file_address.clear()
def start_catch_device_log(self, log_file_pipe=None,
hilog_file_pipe=None):
hilog_file_pipe=None, **kwargs):
"""
Starts hdc log for each device in separate subprocesses and save
the logs in files.
"""
self._sync_device_time()
# set device log level
if not self.device_log_level:
log_level = kwargs.get("log_level", "DEBUG")
if log_level not in LOGLEVEL:
self.device_log_level = "DEBUG"
else:
self.device_log_level = log_level
cmd = "hilog -b {}".format(self.device_log_level)
self.device.execute_shell_command(cmd)
device_hilog_proc = None
if hilog_file_pipe:
command = "hilog"
@ -920,7 +948,8 @@ class DeviceLogCollector:
stop_standing_subprocess(proc)
self.device.log.debug("Stop catch device hilog.")
def start_hilog_task(self, log_size="10M"):
def start_hilog_task(self, **kwargs):
log_size = kwargs.get("log_size", "10M")
log_size = log_size.upper()
if re.search("^\d+[K?]$", log_size) is None \
and re.search("^\d+[M?]$", log_size) is None:
@ -937,23 +966,37 @@ class DeviceLogCollector:
log_size = "512M"
self._sync_device_time()
self.is_clear = kwargs.get("is_clear", True)
# clear device crash log
self.clear_crash_log()
# 先停止一下
# stop hilog task
cmd = "hilog -w stop"
self.device.execute_shell_command(cmd)
# 清空日志
cmd = "hilog -r"
self.device.execute_shell_command(cmd)
cmd = "rm -rf /data/log/hilog/*.gz"
# set device log level
if not self.device_log_level:
log_level = kwargs.get("log_level", "DEBUG")
if log_level not in LOGLEVEL:
self.device_log_level = "DEBUG"
else:
self.device_log_level = log_level
cmd = "hilog -b {}".format(self.device_log_level)
self.device.execute_shell_command(cmd)
# 开始日志任务 设置落盘文件个数最大值1000, 单个文件20M链接https://gitee.com/openharmony/hiviewdfx_hilog
cmd = "hilog -w start -l {} -n 1000".format(log_size)
out = self.device.execute_shell_command(cmd)
LOG.info("Execute command: {}, result is {}".format(cmd, out))
# 开启kmsg日志落盘任务
cmd = "hilog -w start -t kmsg -l {} -n 1000".format(log_size)
out = self.device.execute_shell_command(cmd)
LOG.info("Execute command: {}, result is {}".format(cmd, out))
def stop_hilog_task(self, log_name, **kwargs):
cmd = "hilog -w stop"
out = self.device.execute_shell_command(cmd)
self.device.execute_shell_command(cmd)
module_name = kwargs.get("module_name", None)
if module_name:
path = "{}/log/{}".format(self.device.get_device_report_path(), module_name)
@ -976,14 +1019,18 @@ class DeviceLogCollector:
LOG.info("Execute command: {}, result is {}".format(cmd, out))
if out is not None and "No space left on device" not in out:
self.device.pull_file("/data/log/{}_hilog.tar.gz".format(log_name), path)
cmd = "rm -rf /data/log/{}_hilog.tar.gz".format(log_name)
cmd = "rm -f /data/log/{}_hilog.tar.gz".format(log_name)
self.device.execute_shell_command(cmd)
# 获取crash日志
# check if clear log
if self.is_clear:
cmd = "rm -f /data/log/hilog/*.gz"
self.device.execute_shell_command(cmd)
# get crash log
self.start_get_crash_log(log_name, module_name=module_name)
# 获取额外路径的日志
# get extra log
self.pull_extra_log_files(log_name, module_name, kwargs.get("extras_dirs", None))
def _get_log(self, log_cmd, *params):
def _get_log(self, log_path, *params):
def filter_by_name(log_name, args):
for starts_name in args:
if log_name.startswith(starts_name):
@ -992,72 +1039,61 @@ class DeviceLogCollector:
data_list = list()
log_name_array = list()
log_result = self.device.execute_shell_command(log_cmd)
log_result = self.device.execute_shell_command("ls {}".format(log_path))
if log_result is not None and len(log_result) != 0:
log_name_array = log_result.strip().replace("\r", "").split("\n")
for log_name in log_name_array:
log_name = log_name.strip()
if len(params) == 0 or \
filter_by_name(log_name, params):
data_list.append(log_name)
data_list.append("{}/{}".format(log_path, log_name))
return data_list
def get_cur_crash_log(self, crash_path, log_name):
log_name_map = {'cppcrash': NATIVE_CRASH_PATH,
"jscrash": JS_CRASH_PATH,
"SERVICE_BLOCK": ROOT_PATH,
"appfreeze": ROOT_PATH}
if not os.path.exists(crash_path):
os.makedirs(crash_path)
if "Not support std mode" in log_name:
return
def get_log_path(logname):
name_array = logname.split("-")
if len(name_array) <= 1:
return ROOT_PATH
return log_name_map.get(name_array[0])
log_path = get_log_path(log_name)
temp_path = "%s/%s" % (log_path, log_name)
self.device.pull_file(temp_path, crash_path)
LOG.debug("Finish pull file: %s" % log_name)
def start_get_crash_log(self, task_name, **kwargs):
def get_cur_crash_log(local_path, device_path):
if not os.path.exists(local_path):
os.makedirs(local_path)
if "Not support std mode" in device_path:
return
self.device.pull_file(device_path, local_path)
LOG.debug("Finish pull file: %s" % device_path)
module_name = kwargs.get("module_name", None)
log_array = list()
native_crash_cmd = "ls {}".format(NATIVE_CRASH_PATH)
js_crash_cmd = '"ls {} | grep jscrash"'.format(JS_CRASH_PATH)
block_crash_cmd = '"ls {}"'.format(ROOT_PATH)
# 获取crash日志文件
log_array.extend(self._get_log(native_crash_cmd, "cppcrash"))
log_array.extend(self._get_log(js_crash_cmd, "jscrash"))
log_array.extend(self._get_log(block_crash_cmd, "SERVICE_BLOCK", "appfreeze"))
# get crash log
log_array.extend(self._get_log(NATIVE_CRASH_PATH, "cppcrash"))
log_array.extend(self._get_log(JS_CRASH_PATH, "jscrash", "appfreeze", "cppcrash"))
log_array.extend(self._get_log(ROOT_PATH, "SERVICE_BLOCK", "appfreeze"))
LOG.debug("crash log file {}, length is {}".format(str(log_array), str(len(log_array))))
if module_name:
crash_path = "{}/log/{}/{}_crash_log/".format(self.device.get_device_report_path(), module_name, task_name)
crash_path = "{}/log/{}/crash_log_{}/".format(self.device.get_device_report_path(), module_name, task_name)
else:
crash_path = "{}/log/crash_log_{}/".format(self.device.get_device_report_path(), task_name)
for log_name in log_array:
log_name = log_name.strip()
self.get_cur_crash_log(crash_path, log_name)
get_cur_crash_log(crash_path, log_name)
def clear_crash_log(self):
clear_block_crash_cmd = "rm -f {}/*".format(ROOT_PATH)
clear_native_crash_cmd = "rm -f {}/*".format(NATIVE_CRASH_PATH)
clear_debug_crash_cmd = "rm -f {}/debug/*".format(ROOT_PATH)
clear_js_crash_cmd = "rm -f {}/*".format(JS_CRASH_PATH)
self.device.execute_shell_command(clear_block_crash_cmd)
self.device.execute_shell_command(clear_native_crash_cmd)
self.device.execute_shell_command(clear_debug_crash_cmd)
self.device.execute_shell_command(clear_js_crash_cmd)
if not self.is_clear:
LOG.debug("No need to clear crash log.")
return
def execute_clear_cmd(path: str, prefix: list):
for pre in prefix:
clear_cmd = "rm -f {}/{}*".format(path, pre)
self.device.execute_shell_command(clear_cmd)
execute_clear_cmd(ROOT_PATH, ["SERVICE_BLOCK", "appfreeze"])
execute_clear_cmd(NATIVE_CRASH_PATH, ["cppcrash"])
execute_clear_cmd(JS_CRASH_PATH, ["jscrash", "appfreeze", "cppcrash"])
def _sync_device_time(self):
# 先同步PC和设备的时间
iso_time_format = '%Y-%m-%d %H:%M:%S'
cur_time = get_cst_time().strftime(iso_time_format)
self.device.execute_shell_command("date '{}'".format(cur_time))
self.device.execute_shell_command("hwclock --systohc")
def add_log_address(self, log_file_address, hilog_file_address):
# record to restart catch log when reboot device
@ -1073,7 +1109,7 @@ class DeviceLogCollector:
self.hilog_file_address.remove(hilog_file_address)
def pull_extra_log_files(self, task_name, module_name, dirs: str):
if dirs is None:
if dirs is None or dirs == 'None':
return
dir_list = dirs.split(";")
if len(dir_list) > 0:
@ -1083,3 +1119,9 @@ class DeviceLogCollector:
os.makedirs(extra_log_path)
for dir_path in dir_list:
self.device.pull_file(dir_path, extra_log_path)
# check if delete file
if self.device.is_directory(dir_path):
clear_cmd = "rm -f {}/*".format(dir_path)
else:
clear_cmd = "rm -f {}*".format(dir_path)
self.device.execute_shell_command(clear_cmd)

View File

@ -1034,6 +1034,13 @@ def run_command(device, command):
device.reboot()
elif command.strip() == "reboot-delay":
pass
elif command.strip().startswith("wait"):
command_list = command.split(" ")
if command_list and len(command_list) > 1:
secs = int(command_list[1])
LOG.debug("Start wait {} secs".format(secs))
time.sleep(secs)
stdout = "Finish wait 10 secs"
elif command.strip().endswith("&"):
device.execute_shell_in_daemon(command.strip())
else:

View File

@ -43,6 +43,7 @@ from _core.exception import ShellCommandUnresponsiveException
from _core.exception import DeviceUnresponsiveException
from _core.exception import AppInstallError
from _core.exception import HapNotSupportTest
from _core.exception import RpcNotRunningError
from _core.constants import DeviceTestType
from _core.constants import DeviceLabelType
from _core.constants import ManagerType
@ -61,6 +62,8 @@ from _core.constants import FilePermission
from _core.constants import HostDrivenTestType
from _core.constants import DeviceConnectorType
from _core.constants import AdvanceDeviceOption
from _core.constants import LifeStage
from _core.constants import Platform
from _core.config.config_manager import UserConfigManager
from _core.config.resource_manager import ResourceManager
from _core.executor.listener import CaseResult
@ -69,6 +72,7 @@ from _core.executor.listener import SuitesResult
from _core.executor.listener import StateRecorder
from _core.executor.listener import TestDescription
from _core.executor.listener import CollectingTestListener
from _core.executor.request import Task
from _core.testkit.json_parser import JsonParser
from _core.testkit.kit import junit_para_parse
from _core.testkit.kit import gtest_para_parse
@ -94,6 +98,7 @@ from _core.utils import do_module_kit_teardown
from _core.utils import convert_serial
from _core.utils import convert_ip
from _core.utils import convert_port
from _core.utils import convert_mac
from _core.utils import check_mode
from _core.utils import get_filename_extension
from _core.utils import get_test_component_version
@ -106,7 +111,9 @@ from _core.utils import get_shell_handler
from _core.utils import get_decode
from _core.utils import get_cst_time
from _core.utils import get_delta_time_ms
from _core.utils import get_device_proc_pid
from _core.utils import get_netstat_proc_pid
from _core.utils import calculate_elapsed_time
from _core.utils import check_mode_in_sys
from _core.utils import start_standing_subprocess
from _core.utils import stop_standing_subprocess
from _core.logger import LogQueue
@ -136,7 +143,10 @@ from _core.report.reporter_helper import DataHelper
from _core.report.__main__ import main_report
from _core.command.console import Console
VERSION = "2.30.0.1104"
__all__ = [
"VERSION",
"Variables",
"Console",
"platform_logger",
@ -163,6 +173,7 @@ __all__ = [
"DeviceUnresponsiveException",
"AppInstallError",
"HapNotSupportTest",
"RpcNotRunningError",
"DeviceTestType",
"DeviceLabelType",
"ManagerType",
@ -189,6 +200,7 @@ __all__ = [
"StateRecorder",
"TestDescription",
"CollectingTestListener",
"Task",
"Scheduler",
"SuiteReporter",
"DeviceSelectionOption",
@ -232,6 +244,7 @@ __all__ = [
"convert_serial",
"convert_ip",
"convert_port",
"convert_mac",
"check_mode",
"get_filename_extension",
"get_test_component_version",
@ -244,14 +257,18 @@ __all__ = [
"get_decode",
"get_cst_time",
"get_delta_time_ms",
"get_device_proc_pid",
"get_netstat_proc_pid",
"calculate_elapsed_time",
"check_mode_in_sys",
"start_standing_subprocess",
"stop_standing_subprocess",
"ExecInfo",
"ResultReporter",
"DataHelper",
"main_report",
"LogQueue"
"LogQueue",
"LifeStage",
"Platform"
]

View File

@ -19,7 +19,7 @@ import os
import sys
from xdevice import Console
from xdevice import platform_logger
from _core.utils import get_version
from xdevice import VERSION
srcpath = os.path.dirname(os.path.dirname(__file__))
@ -31,7 +31,7 @@ LOG = platform_logger("Main")
def main_process(command=None):
LOG.info(
"*************** xDevice Test Framework %s Starting ***************" %
get_version())
VERSION)
if command:
args = str(command).split(" ")
args.insert(0, "xDevice")

View File

@ -156,6 +156,7 @@ class UserConfigManager(object):
nodes = self.config_content.find(target_name)
if attr_name in nodes.attrib:
return nodes.attrib.get(attr_name)
return None
def get_com_device(self, target_name):
devices = []
@ -255,5 +256,11 @@ class UserConfigManager(object):
else:
data_dic.update({ConfigConst.tag_enable: str(node.text).strip()})
data_dic.update({ConfigConst.tag_dir: None})
data_dic.update({ConfigConst.tag_loglevel: "DEBUG"})
return data_dic
def environment_enable(self):
if self.config_content.find("environment") or\
self.config_content.find("environment/device"):
return True
return False

View File

@ -47,6 +47,7 @@ class ProductForm(object):
television = "tv"
watch = "watch"
tablet = 'tablet'
_2in1 = '2in1'
@dataclass
@ -292,6 +293,8 @@ class ConfigConst(object):
device_log_off = "OFF"
tag_dir = "dir"
tag_enable = "enable"
tag_loglevel = "loglevel"
tag_clear = "clear"
query_resource = "query_resource"
env_pool_cache = "env_pool_cache"
@ -330,3 +333,22 @@ class AdvanceDeviceOption(object):
version_cmd = "version_cmd"
label = "label"
@dataclass
class Platform(object):
"""
Platform enumeration
"""
ohos = "OpenHarmony"
@dataclass
class LifeStage(object):
"""
LifeStage enumeration
"""
task_start = "TaskStart"
task_end = "TaskEnd"
case_start = "CaseStart"
case_end = "CaseEnd"

View File

@ -146,4 +146,14 @@ class HapNotSupportTest(DeviceError):
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
return str(self.error_msg)
class RpcNotRunningError(DeviceError):
def __init__(self, error_msg, error_no=""):
super(RpcNotRunningError, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)

View File

@ -40,7 +40,7 @@ class Descriptor:
The descriptor for a test or suite
"""
def __init__(self, uuid=None, name=None, source=None, container=False):
def __init__(self, uuid=None, name=None, source=None, container=False, error=None):
self.unique_id = uuid
self.display_name = name
self.tags = {}
@ -48,6 +48,7 @@ class Descriptor:
self.parent = None
self.children = []
self.container = container
self.error = error
def get_container(self):
return self.container
@ -63,6 +64,7 @@ class Task:
EMPTY_TASK = "empty"
TASK_CONFIG_SUFFIX = ".json"
TASK_CONFIG_DIR = "config"
life_stage_listener = None
def __init__(self, root=None, drivers=None, config=None):
self.root = root

View File

@ -23,6 +23,7 @@ import queue
import time
import uuid
import shutil
import json
from xml.etree import ElementTree
from _core.utils import unique_id
@ -30,6 +31,7 @@ from _core.utils import check_mode
from _core.utils import get_sub_path
from _core.utils import get_filename_extension
from _core.utils import convert_serial
from _core.utils import convert_mac
from _core.utils import get_instance_name
from _core.utils import is_config_str
from _core.utils import check_result_report
@ -42,7 +44,6 @@ from _core.exception import LiteDeviceError
from _core.exception import DeviceError
from _core.interface import LifeCycle
from _core.executor.request import Request
from _core.executor.request import Task
from _core.executor.request import Descriptor
from _core.plugin import get_plugin
from _core.plugin import Plugin
@ -60,6 +61,7 @@ from _core.constants import ListenerType
from _core.constants import ConfigConst
from _core.constants import ReportConst
from _core.constants import HostDrivenTestType
from _core.constants import LifeStage
from _core.executor.concurrent import DriversThread
from _core.executor.concurrent import QueueMonitorThread
from _core.executor.concurrent import DriversDryRunThread
@ -100,26 +102,33 @@ class Scheduler(object):
# the number of tests in current task
test_number = 0
device_labels = []
repeat_index = 0
auto_retry = -1
is_need_auto_retry = False
queue_monitor_thread = None
def __discover__(self, args):
"""Discover task to execute"""
from _core.executor.request import Task
config = Config()
config.update(args)
task = Task(drivers=[])
task.init(config)
Scheduler.call_life_stage_action(stage=LifeStage.task_start)
root_descriptor = self._find_test_root_descriptor(task.config)
task.set_root_descriptor(root_descriptor)
return task
def __execute__(self, task):
error_message = ""
unavailable = 0
try:
Scheduler.is_execute = True
if Scheduler.command_queue:
LOG.debug("Run command: %s" % Scheduler.command_queue[-1])
LOG.debug("Run command: {}".format(convert_mac(Scheduler.command_queue[-1])))
run_command = Scheduler.command_queue.pop()
task_id = str(uuid.uuid1()).split("-")[0]
Scheduler.command_queue.append((task_id, run_command,
@ -136,14 +145,27 @@ class Scheduler(object):
# do with the count of repeat about a task
if getattr(task.config, ConfigConst.repeat, 0) > 0:
Scheduler.repeat_index = \
getattr(task.config, ConfigConst.repeat)
drivers_list = list()
for repeat_index in range(task.config.repeat):
for driver_index in range(len(task.test_drivers)):
drivers_list.append(
copy.deepcopy(task.test_drivers[driver_index]))
for index in range(-1, task.config.repeat):
repeat_list = self.construct_repeat_list(task, index)
if repeat_list:
drivers_list.extend(repeat_list)
task.test_drivers = drivers_list
else:
Scheduler.repeat_index = 0
self.test_number = len(task.test_drivers)
for des in task.root.children:
if des.error:
error_message = "{};{}".format(des.error.error_msg, error_message)
unavailable += 1
if error_message != "":
error_message = "test source '{}' or its config json not exists".format(error_message)
LOG.error("Exec task error: {}".format(error_message))
raise ParamError(error_message)
if task.config.exectype == TestExecType.device_test:
self._device_test_execute(task)
@ -159,13 +181,15 @@ class Scheduler(object):
if error_no else str(exception)
error_no = error_no if error_no else "00000"
LOG.exception(exception, exc_info=False, error_no=error_no)
finally:
Scheduler.reset_test_dict_source()
if getattr(task.config, ConfigConst.test_environment, "") or \
getattr(task.config, ConfigConst.configfile, ""):
self._restore_environment()
Scheduler.call_life_stage_action(stage=LifeStage.task_end,
task=task, error=error_message,
unavailable=unavailable)
if Scheduler.upload_address:
Scheduler.upload_task_result(task, error_message)
Scheduler.upload_report_end()
@ -188,7 +212,7 @@ class Scheduler(object):
message_queue = queue.Queue()
# execute test drivers
queue_monitor_thread = self._start_queue_monitor(
self.queue_monitor_thread = self._start_queue_monitor(
message_queue, test_drivers, current_driver_threads)
while test_drivers:
if len(current_driver_threads) > 5:
@ -215,7 +239,7 @@ class Scheduler(object):
# wait for all drivers threads finished and do kit teardown
while True:
if not queue_monitor_thread.is_alive():
if not self.queue_monitor_thread.is_alive():
break
time.sleep(3)
@ -233,7 +257,7 @@ class Scheduler(object):
task_unused_env = []
# execute test drivers
queue_monitor_thread = self._start_queue_monitor(
self.queue_monitor_thread = self._start_queue_monitor(
message_queue, test_drivers, current_driver_threads)
while test_drivers:
# clear remaining test drivers when scheduler is terminated
@ -259,8 +283,9 @@ class Scheduler(object):
# start driver thread
thread_id = self._get_thread_id(current_driver_threads)
driver_thread = DriversDryRunThread(test_driver, task, environment,
message_queue)
driver_thread = DriversDryRunThread(test_driver, task,
environment,
message_queue)
driver_thread.setDaemon(True)
driver_thread.set_thread_id(thread_id)
driver_thread.start()
@ -270,13 +295,14 @@ class Scheduler(object):
# wait for all drivers threads finished and do kit teardown
while True:
if not queue_monitor_thread.is_alive():
if not self.queue_monitor_thread.is_alive():
break
time.sleep(3)
self._do_taskkit_teardown(used_devices, task_unused_env)
finally:
LOG.debug("Removing report_path: {}".format(task.config.report_path))
LOG.debug(
"Removing report_path: {}".format(task.config.report_path))
# delete reports
self.stop_task_logcat()
self.stop_encrypt_log()
@ -293,23 +319,29 @@ class Scheduler(object):
if used_devices:
serials = []
platforms = []
test_labels = []
for serial, device in used_devices.items():
serials.append(convert_serial(serial))
platform = str(device.label).capitalize()
platform = str(device.test_platform)
test_label = str(device.label).capitalize()
if platform not in platforms:
platforms.append(platform)
if test_label not in test_labels:
test_labels.append(test_label)
task_info.device_name = ",".join(serials)
task_info.platform = ",".join(platforms)
task_info.device_label = ",".join(test_labels)
else:
task_info.device_name = "None"
task_info.platform = "None"
task_info.device_label = "None"
task_info.test_time = task.config.start_time
task_info.product_info = getattr(task, "product_info", "")
listeners = self._create_listeners(task)
for listener in listeners:
listener.__ended__(LifeCycle.TestTask, task_info,
test_type=task_info.test_type)
test_type=task_info.test_type, task=task)
@classmethod
def _create_listeners(cls, task):
@ -365,6 +397,10 @@ class Scheduler(object):
while True:
if not Scheduler.is_execute:
break
if self.queue_monitor_thread and \
not self.queue_monitor_thread.is_alive():
LOG.error("Queue monitor thread is dead.")
break
environment = env_manager.apply_environment(device_options)
if len(environment.devices) == len(device_options):
return environment
@ -509,7 +545,7 @@ class Scheduler(object):
task_unused_env = []
# execute test drivers
queue_monitor_thread = self._start_queue_monitor(
self.queue_monitor_thread = self._start_queue_monitor(
message_queue, test_drivers, current_driver_threads)
while test_drivers:
# clear remaining test drivers when scheduler is terminated
@ -521,6 +557,10 @@ class Scheduler(object):
# get test driver and device
test_driver = test_drivers[0]
# call life stage
Scheduler.call_life_stage_action(stage=LifeStage.case_start,
case_name=test_driver[1].source.module_name)
if getattr(task.config, ConfigConst.history_report_path, ""):
module_name = test_driver[1].source.module_name
if not self.is_module_need_retry(task, module_name):
@ -542,6 +582,18 @@ class Scheduler(object):
task.config.__dict__, test_driver)
except DeviceError as exception:
self._handle_device_error(exception, task, test_drivers)
Scheduler.call_life_stage_action(stage=LifeStage.case_end,
case_name=test_driver[1].source.module_name,
case_result="Failed",
error_msg=exception.args)
continue
if not self.queue_monitor_thread.is_alive():
LOG.debug("Restart queue monitor thread.")
current_driver_threads = {}
message_queue = queue.Queue()
self.queue_monitor_thread = self._start_queue_monitor(
message_queue, test_drivers, current_driver_threads)
continue
if not Scheduler.is_execute:
@ -581,7 +633,7 @@ class Scheduler(object):
# wait for all drivers threads finished and do kit teardown
while True:
if not queue_monitor_thread.is_alive():
if not self.queue_monitor_thread.is_alive():
break
time.sleep(3)
@ -736,11 +788,9 @@ class Scheduler(object):
@classmethod
def _get_thread_id(cls, current_driver_threads):
thread_id = get_cst_time().strftime(
'%Y-%m-%d-%H-%M-%S-%f')
thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
while thread_id in current_driver_threads.keys():
thread_id = get_cst_time().strftime(
'%Y-%m-%d-%H-%M-%S-%f')
thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
return thread_id
@classmethod
@ -870,10 +920,6 @@ class Scheduler(object):
config, ConfigConst.testlist, "") or getattr(
config, ConfigConst.task, "") or getattr(
config, ConfigConst.testcase)
# read test list from testfile, testlist or task
test_set = getattr(config, "testfile", "") or getattr(
config, "testlist", "") or getattr(config, "task", "") or getattr(
config, "testcase")
if test_set:
fname, _ = get_filename_extension(test_set)
uid = unique_id("Scheduler", fname)
@ -888,6 +934,7 @@ class Scheduler(object):
@classmethod
def terminate_cmd_exec(cls):
Scheduler.is_execute = False
Scheduler.repeat_index = 0
Scheduler.auto_retry = -1
LOG.info("Start to terminate execution")
return Scheduler.terminate_result.get()
@ -915,7 +962,6 @@ class Scheduler(object):
return
result_file = exec_message.get_result()
request = exec_message.get_request()
test_name = request.root.source.test_name
if not result_file or not os.path.exists(result_file):
LOG.error("%s result not exists", test_name, error_no="00200")
@ -950,15 +996,18 @@ class Scheduler(object):
report_path = result_file
testsuites_element = DataHelper.parse_data_report(report_path)
start_time, end_time = cls._get_time(testsuites_element)
if request.get_test_type() == HostDrivenTestType.device_test:
test_type = request.get_test_type()
if test_type == HostDrivenTestType.device_test or test_type == HostDrivenTestType.windows_test:
for model_element in testsuites_element:
case_id = model_element.get(ReportConstant.name, "")
case_result, error = cls.get_script_result(model_element)
if error and len(error) > MAX_VISIBLE_LENGTH:
error = "$s..." % error[:MAX_VISIBLE_LENGTH]
error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
report = cls._get_report_path(
request.config.report_path,
model_element.get(ReportConstant.report, ""))
upload_params.append(
(case_id, case_result, error, start_time,
end_time, request.config.report_path,))
(case_id, case_result, error, start_time, end_time, report,))
else:
for testsuite_element in testsuites_element:
if check_mode(ModeType.developer):
@ -970,15 +1019,17 @@ class Scheduler(object):
for case_element in testsuite_element:
case_id = cls._get_case_id(case_element, module_name)
case_result, error = cls._get_case_result(case_element)
if error and len(error) > MAX_VISIBLE_LENGTH:
error = "%s..." % error[:MAX_VISIBLE_LENGTH]
if case_result == "Ignored":
LOG.info("Get upload params: %s result is ignored",
case_id)
LOG.info(
"Get upload params: {} result is ignored".format(case_id))
continue
if error and len(error) > MAX_VISIBLE_LENGTH:
error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
report = cls._get_report_path(
request.config.report_path,
case_element.get(ReportConstant.report, ""))
upload_params.append(
(case_id, case_result, error, start_time,
end_time, request.config.report_path,))
(case_id, case_result, error, start_time, end_time, report,))
return upload_params, start_time, end_time
@classmethod
@ -1002,6 +1053,9 @@ class Scheduler(object):
return result, ""
if Scheduler.mode == ModeType.decc:
result = "Failed"
result_kind = model_element.get(ReportConstant.result_kind, "")
if result_kind:
result = result_kind
error_msg = model_element.get(ReportConstant.message, "")
if not error_msg and len(model_element) > 0:
@ -1038,6 +1092,11 @@ class Scheduler(object):
result = "Blocked"
elif case.is_ignored():
result = "Ignored"
elif case.is_completed():
if case.message:
result = "Failed"
else:
result = "Passed"
else:
result = "Unavailable"
return result, case.message
@ -1061,18 +1120,31 @@ class Scheduler(object):
end_time = int(time.mktime(time.strptime(
timestamp, ReportConstant.time_format)) * 1000)
except ArithmeticError as error:
LOG.error("Get time error %s" % error)
LOG.error("Get time error {}".format(error))
end_time = int(time.time() * 1000)
except ValueError as error:
LOG.error("Get time error {}".format(error))
end_time = int(time.mktime(time.strptime(
timestamp.split(".")[0], ReportConstant.time_format)) * 1000)
start_time = int(end_time - float(cost_time) * 1000)
else:
current_time = int(time.time() * 1000)
start_time, end_time = current_time, current_time
except ArithmeticError as error:
LOG.error("Get time error %s" % error)
LOG.error("Get time error {}".format(error))
current_time = int(time.time() * 1000)
start_time, end_time = current_time, current_time
return start_time, end_time
@classmethod
def _get_report_path(cls, base_path, report=""):
""" get report path
base_path: str, report base path
report : str, report relative path
"""
report_path = os.path.join(base_path, report)
return report_path if report and os.path.exists(report_path) else base_path
@classmethod
def upload_task_result(cls, task, error_message=""):
if not Scheduler.task_name:
@ -1097,9 +1169,10 @@ class Scheduler(object):
error_msg, "%s;" % child.get(ReportConstant.message))
if error_msg:
error_msg = error_msg[:-1]
cls.upload_case_result((Scheduler.task_name, task_result,
error_msg, start_time, end_time,
task.config.report_path))
report = cls._get_report_path(
task.config.report_path, ReportConstant.summary_vision_report)
cls.upload_case_result(
(Scheduler.task_name, task_result, error_msg, start_time, end_time, report))
@classmethod
def _get_task_result(cls, task_element):
@ -1140,7 +1213,7 @@ class Scheduler(object):
failed_flag = False
if check_mode(ModeType.decc):
from xdevice import SuiteReporter
for module, failed in SuiteReporter.get_failed_case_list():
for module, _ in SuiteReporter.get_failed_case_list():
if module_name == module or str(module_name).split(
".")[0] == module:
failed_flag = True
@ -1151,9 +1224,11 @@ class Scheduler(object):
getattr(task.config, ConfigConst.history_report_path, "")
params = ResultReporter.get_task_info_params(history_report_path)
if params and params[ReportConst.unsuccessful_params]:
if dict(params[ReportConst.unsuccessful_params]).get(module_name, []):
if dict(params[ReportConst.unsuccessful_params]).get(
module_name, []):
failed_flag = True
elif dict(params[ReportConst.unsuccessful_params]).get(str(module_name).split(".")[0], []):
elif dict(params[ReportConst.unsuccessful_params]).get(
str(module_name).split(".")[0], []):
failed_flag = True
return failed_flag
@ -1180,6 +1255,7 @@ class Scheduler(object):
return True
if year_interval == 1 and month_interval + 12 in (1, 2):
return True
return False
@classmethod
def _parse_property_value(cls, property_name, driver_request, kit):
@ -1187,7 +1263,6 @@ class Scheduler(object):
driver_request.config.get(ConfigConst.testargs, dict()))
property_value = ""
if ConfigConst.pass_through in test_args.keys():
import json
pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
property_value = pt_dict.get(property_name, None)
elif property_name in test_args.keys:
@ -1288,6 +1363,16 @@ class Scheduler(object):
"Require subsystem=%s part=%s, no device match this"
% (module_name, _subsystem, _part))
@classmethod
def construct_repeat_list(cls, task, index):
repeat_list = list()
for driver_index in range(len(task.test_drivers)):
cur_test_driver = copy.deepcopy(task.test_drivers[driver_index])
desc = cur_test_driver[1]
desc.unique_id = '{}_{}'.format(desc.unique_id, index + 1)
repeat_list.append(cur_test_driver)
return repeat_list
@classmethod
def start_auto_retry(cls):
if not Scheduler.is_need_auto_retry:
@ -1304,6 +1389,54 @@ class Scheduler(object):
@classmethod
def check_auto_retry(cls, options):
if Scheduler.auto_retry < 0 and int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
if Scheduler.auto_retry < 0 and \
int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
value = int(getattr(options, ConfigConst.auto_retry, 0))
Scheduler.auto_retry = value if value <= 10 else 10
@staticmethod
def call_life_stage_action(**kwargs):
"""
call in different lift stage
"""
from xdevice import Task
from xdevice import Variables
if Task.life_stage_listener is None:
return
stage = kwargs.get("stage", None)
data = dict()
if stage == LifeStage.task_start:
data = {"type": stage, "name": Variables.task_name}
elif stage == LifeStage.task_end:
task = kwargs.get("task", None)
error = kwargs.get("error", "")
unavailable = kwargs.get("unavailable", 0)
summary_data_report = os.path.join(task.config.report_path,
ReportConstant.summary_data_report) if task else ""
if not os.path.exists(summary_data_report):
LOG.error("Call lifecycle error, summary report {} not exists".format(task.config.report_path))
passed = failures = blocked = 0
else:
task_element = ElementTree.parse(summary_data_report).getroot()
total_tests = int(task_element.get(ReportConstant.tests, 0))
failures = int(task_element.get(ReportConstant.failures, 0))
blocked = int(task_element.get(ReportConstant.disabled, 0))
ignored = int(task_element.get(ReportConstant.ignored, 0))
unavailable = int(task_element.get(ReportConstant.unavailable, 0))
passed = total_tests - failures - blocked - ignored
data = {"type": stage, "name": Variables.task_name,
"passed": passed, "failures": failures,
"blocked": blocked, "unavailable": unavailable,
"error": error}
elif stage == LifeStage.case_start:
case_name = kwargs.get("case_name", "")
data = {"type": stage, "name": case_name}
elif stage == LifeStage.case_end:
case_name = kwargs.get("case_name", "")
case_result = kwargs.get("case_result", "")
error_msg = kwargs.get("error_msg", "")
data = {"type": stage, "name": case_name, "case_result": case_result, "error_msg": error_msg}
else:
LOG.error("Call lifecycle error, error param stage: {}".format(stage))
return
Task.life_stage_listener(data)

View File

@ -393,8 +393,7 @@ def _create_descriptor(config_file, filename, test_source, test_type, config):
error_message = "test source '%s' or '%s' not exists" % (
test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX))
error_no = "00102"
if Scheduler.mode != ModeType.decc:
raise ParamError(error_message, error_no=error_no)
desc.error = ParamError(error_message, error_no=error_no)
if Scheduler.mode == ModeType.decc and error_message:
Scheduler.report_not_executed(config.report_path, [("", desc)],

View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from _core.report.reporter_helper import ReportConstant
from _core.report.reporter_helper import DataHelper
class RepeatHelper:
def __init__(self, report_path):
self.data_helper = DataHelper()
self.report_path = report_path
def __generate_repeat_xml__(self, summary_data_path):
from xdevice import Scheduler
if Scheduler.repeat_index == 0:
return
root_tree = self.data_helper.parse_data_report(summary_data_path)
modules = dict()
name_set = set()
for suite in root_tree:
module_name = suite.attrib.get(ReportConstant.module_name, "")
if not module_name:
continue
name_set.add(module_name)
name_in_suite = suite.attrib.get(ReportConstant.name, "")
uuid = "{}#{}".format(module_name, name_in_suite)
total = int(suite.attrib.get(ReportConstant.tests, 0))
if total == 0:
continue
if uuid not in modules.keys():
modules[uuid] = suite
continue
self._update_suite(modules, suite, uuid)
root_tree = self._update_root_tree(modules, root_tree)
root_tree.attrib.update({ReportConstant.modules: str(len(name_set))})
root_tree.attrib.update({ReportConstant.run_modules: str(len(name_set))})
file_name = r"{}\repeated.xml".format(self.report_path)
self.data_helper.generate_report(root_tree, file_name)
@classmethod
def _update_root_tree(cls, modules, root_tree):
for item in root_tree.findall(ReportConstant.test_suite):
root_tree.remove(item)
need_update_attributes = \
[ReportConstant.tests, ReportConstant.ignored,
ReportConstant.failures, ReportConstant.disabled,
ReportConstant.errors]
root_tree.attrib.update({ReportConstant.tests: "0"})
root_tree.attrib.update({ReportConstant.unavailable: "0"})
for _, test_suite in modules.items():
for update_attribute in need_update_attributes:
value = int(test_suite.attrib.get(update_attribute, 0))
value = int(root_tree.attrib.get(update_attribute, 0)) + value
root_tree.attrib.update({update_attribute: str(value)})
root_tree.append(test_suite)
return root_tree
def _update_suite(self, modules, suite, uuid):
for testcase in suite:
name = testcase.attrib.get(ReportConstant.name, "")
class_name = testcase.attrib.get(ReportConstant.class_name, "")
pattern = ".//{}[@{}='{}'][@{}='{}']".format(
ReportConstant.test_case, ReportConstant.class_name, class_name,
ReportConstant.name, name)
matched_case = modules[uuid].find(pattern)
if matched_case is None:
modules[uuid].append(testcase)
tests = int(
modules[uuid].attrib.get(ReportConstant.tests, 0)) + 1
modules[uuid].attrib[ReportConstant.tests] = str(tests)
status = self._need_update_status(testcase)
if status:
value = int(modules[uuid].attrib.get(
ReportConstant.status, 0)) + 1
modules[uuid].attrib[status] = str(value)
continue
if testcase.attrib.get(ReportConstant.result,
ReportConstant.false) == ReportConstant.true:
modules[uuid].remove(matched_case)
modules[uuid].append(testcase)
status = self._need_update_status(testcase)
if status:
value = int(modules[uuid].attrib.get(
ReportConstant.status, 0)) - 1
modules[uuid].attrib[status] = str(value)
@classmethod
def _need_update_status(cls, testcase):
status = testcase.attrib.get(ReportConstant.status, "")
result = testcase.attrib.get(ReportConstant.result, "")
if result == ReportConstant.true:
return ""
if status == ReportConstant.run:
return ReportConstant.failures
elif status == ReportConstant.skip:
return ReportConstant.ignored
else:
return ReportConstant.disabled

View File

@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
import platform
import time
@ -32,6 +32,15 @@ from _core.constants import FilePermission
LOG = platform_logger("ReporterHelper")
class CaseResult:
passed = "Passed"
failed = "Failed"
blocked = "Blocked"
ignored = "Ignored"
unavailable = "Unavailable"
investigated = "Investigated"
@dataclass
class ReportConstant:
# report name constants
@ -39,6 +48,8 @@ class ReportConstant:
summary_vision_report = "summary_report.html"
details_vision_report = "details_report.html"
failures_vision_report = "failures_report.html"
passes_vision_report = "passes_report.html"
ignores_vision_report = "ignores_report.html"
task_info_record = "task_info.record"
summary_ini = "summary.ini"
summary_report_hash = "summary_report.hash"
@ -46,6 +57,8 @@ class ReportConstant:
summary_title = "Summary Report"
details_title = "Details Report"
failures_title = "Failures Report"
passes_title = "Passes Report"
ignores_title = "Ignores Report"
# exec_info constants
platform = "platform"
@ -56,6 +69,7 @@ class ReportConstant:
log_path = "log_path"
log_path_title = "Log Path"
execute_time = "execute_time"
device_label = "device_label"
# summary constants
product_info = "productinfo"
@ -77,11 +91,13 @@ class ReportConstant:
unavailable = "unavailable"
not_run = "notrun"
message = "message"
report = "report"
# case result constants
module_name = "modulename"
module_name_ = "module_name"
result = "result"
result_kind = "result_kind"
status = "status"
run = "run"
true = "true"
@ -184,7 +200,7 @@ class DataHelper:
file_prefix = kwargs.get("file_prefix", None)
data_reports = cls._get_data_reports(report_path, file_prefix)
if not data_reports:
return
return None
if key:
data_reports.sort(key=key, reverse=reverse)
summary_result = None
@ -194,7 +210,7 @@ class DataHelper:
ReportConstant.unavailable]
for data_report in data_reports:
data_report_element = cls.parse_data_report(data_report)
if not len(list(data_report_element)):
if not list(data_report_element):
continue
if not summary_result:
summary_result = data_report_element
@ -286,7 +302,8 @@ class DataHelper:
class ExecInfo:
keys = [ReportConstant.platform, ReportConstant.test_type,
ReportConstant.device_name, ReportConstant.host_info,
ReportConstant.test_time, ReportConstant.execute_time]
ReportConstant.test_time, ReportConstant.execute_time,
ReportConstant.device_label]
test_type = ""
device_name = ""
host_info = ""
@ -295,6 +312,7 @@ class ExecInfo:
platform = ""
execute_time = ""
product_info = dict()
device_label = ""
class Result:
@ -334,12 +352,12 @@ class Summary:
class Suite:
keys = [ReportConstant.module_name_, ReportConstant.name,
ReportConstant.total, ReportConstant.passed,
ReportConstant.failed, ReportConstant.blocked,
ReportConstant.ignored, ReportConstant.time]
ReportConstant.time, ReportConstant.total, ReportConstant.passed,
ReportConstant.failed, ReportConstant.blocked, ReportConstant.ignored]
module_name = ReportConstant.empty_name
name = ""
time = ""
report = ""
def __init__(self):
self.message = ""
@ -350,7 +368,7 @@ class Suite:
return self.cases
def set_cases(self, element):
if len(element) == 0:
if not element:
LOG.debug("%s has no testcase",
element.get(ReportConstant.name, ""))
return
@ -383,6 +401,7 @@ class Case:
result = ""
message = ""
time = ""
report = ""
def is_passed(self):
if self.result == ReportConstant.true and \
@ -407,6 +426,9 @@ class Case:
def is_ignored(self):
return self.status in [ReportConstant.skip, ReportConstant.not_run]
def is_completed(self):
return self.result == ReportConstant.completed
def get_result(self):
if self.is_failed():
return ReportConstant.failed
@ -436,6 +458,8 @@ class VisionHelper:
def __init__(self):
from xdevice import Variables
self.summary_element = None
self.device_logs = None
self.report_path = ""
self.template_name = os.path.join(Variables.res_dir, "template",
"report.html")
@ -468,6 +492,9 @@ class VisionHelper:
end_time, ReportConstant.time_format))
exec_info.execute_time = self.get_execute_time(round(
end_time - start_time, 3))
exec_info.device_label = getattr(task_info,
ReportConstant.device_label,
"None")
exec_info.log_path = os.path.abspath(os.path.join(report_path, "log"))
try:
@ -525,6 +552,7 @@ class VisionHelper:
ReportConstant.empty_name)
suite.name = child.get(ReportConstant.name, "")
suite.message = child.get(ReportConstant.message, "")
suite.report = child.get(ReportConstant.report, "")
suite.result.total = int(child.get(ReportConstant.tests)) if \
child.get(ReportConstant.tests) else 0
suite.result.failed = int(child.get(ReportConstant.failures)) if \
@ -552,9 +580,8 @@ class VisionHelper:
render_target=ReportConstant.summary_vision_report):
exec_info, summary, suites = parsed_data
if not os.path.exists(self.template_name):
LOG.error("Template file not exists. {}".format(self.template_name))
LOG.error("Template file not exists, {}".format(self.template_name))
return ""
with open(self.template_name) as file:
file_context = file.read()
file_context = self._render_key("", ReportConstant.title_name,
@ -567,8 +594,12 @@ class VisionHelper:
file_context = self._render_cases(file_context, suites)
elif render_target == ReportConstant.failures_vision_report:
file_context = self._render_failure_cases(file_context, suites)
elif render_target == ReportConstant.passes_vision_report:
file_context = self._render_pass_cases(file_context, suites)
elif render_target == ReportConstant.ignores_vision_report:
file_context = self._render_ignore_cases(file_context, suites)
else:
LOG.error("Unsupported vision report type: %s", render_target)
LOG.error("Unsupported vision report type: {}".format(render_target))
return file_context
@classmethod
@ -692,28 +723,33 @@ class VisionHelper:
suite record sample:
<table class="suites">
<tr>
<th class="title" colspan="9">Test detail</th>
<td class='tasklog'>TaskLog:</td>
<td class='normal' colspan='8' style="border-bottom: 1px #E8F0FD solid;">
<a href='log/task_log.log'>task_log.log</a>
</td>
</tr>
<tr>
<th class="normal module">Module</th>
<th class="normal test-suite">Testsuite</th>
<th class="normal testsuite">Testsuite</th>
<th class="normal time">Time(sec)</th>
<th class="normal total">Total Tests</th>
<th class="normal passed">Passed</th>
<th class="normal failed">Failed</th>
<th class="normal blocked">Blocked</th>
<th class="normal ignored">Ignored</th>
<th class="normal time">Time</th>
<th class="normal operate">Operate</th>
</tr>
<tr [class="background-color"]>
<td class="normal module">{suite.module_name}</td>
<td class="normal test-suite">{suite.name}</td>
<td class='normal testsuite'>
<a href='{suite.report}'>{suite.name}</a> or {suite.name}
</td>
<td class="normal time">{suite.time}</td>
<td class="normal total">{suite.result.total}</td>
<td class="normal passed">{suite.result.passed}</td>
<td class="normal failed">{suite.result.failed}</td>
<td class="normal blocked">{suite.result.blocked}</td>
<td class="normal ignored">{suite.result.ignored}</td>
<td class="normal time">{suite.time}</td>
<td class="normal operate">
<a href="details_report.html#{suite.name}" or
"failures_report.html#{suite.name}">
@ -726,7 +762,8 @@ class VisionHelper:
replace_str = "<!--{suites.context}-->"
suites_context = "<table class='suites'>\n"
suites_context = "%s%s" % (suites_context, self._get_suites_title())
suites_context += self._get_task_log()
suites_context += self._get_suites_title()
for index, suite in enumerate(suites):
# construct suite context
suite_name = getattr(suite, "name", self.PLACE_HOLDER)
@ -738,8 +775,13 @@ class VisionHelper:
text = getattr(result, key, self.PLACE_HOLDER)
else:
text = getattr(suite, key, self.PLACE_HOLDER)
suite_context = "{}{}".format(
suite_context, self._add_suite_td_context(key, text))
if key == ReportConstant.name:
report = getattr(suite, ReportConstant.report, self.PLACE_HOLDER)
temp = "<td class='normal testsuite'>{}</td>\n ".format(
"<a href='{}'>{}</a>".format(report, text) if report else text)
else:
temp = self._add_suite_td_context(key, text)
suite_context = "{}{}".format(suite_context, temp)
if suite.result.total == 0:
href = "%s#%s" % (
ReportConstant.failures_vision_report, suite_name)
@ -756,20 +798,86 @@ class VisionHelper:
suites_context = "%s</table>\n" % suites_context
return file_context.replace(replace_str, suites_context)
def _get_task_log(self):
logs = [f for f in os.listdir(os.path.join(self.report_path, 'log')) if f.startswith('task_log.log')]
link = ["<a href='log/{task_log}'>{task_log}</a>".format(task_log=file_name) for file_name in logs]
temp = "<tr>\n" \
" <td class='tasklog'>TaskLog:</td>\n" \
" <td class='normal' colspan='8' style='border-bottom: 1px #E8F0FD solid;'>{}</td>\n" \
"</tr>".format(' | '.join(link))
return temp
def _get_testsuite_device_log(self, module_name, suite_name):
log_index, log_name = 0, 'device_log'
hilog_index, hilog_name = 0, 'device_hilog'
logs = []
for r in self._get_device_logs():
if (r.startswith(log_name) or r.startswith(hilog_name)) \
and ((module_name and module_name in r) or suite_name in r):
logs.append(r)
if not logs:
return ''
link = []
for name in sorted(logs):
display_name = ''
if name.startswith(log_name):
display_name = log_name
if log_index != 0:
display_name = log_name + str(log_index)
log_index += 1
if name.startswith(hilog_name):
display_name = hilog_name
if hilog_index != 0:
display_name = hilog_name + str(hilog_index)
hilog_index += 1
link.append("<a href='{}'>{}</a>".format(os.path.join('log', name), display_name))
ele = "<tr>\n" \
" <td class='devicelog' style='border-bottom: 1px #E8F0FD solid;'>DeviceLog:</td>\n" \
" <td class='normal' colspan='6' style='border-bottom: 1px #E8F0FD solid;'>\n" \
" {}\n" \
" </td>\n" \
"</tr>".format(' | '.join(link))
return ele
def _get_testcase_device_log(self, case_name):
log_name, hilog_name = 'device_log', 'device_hilog'
logs = [r for r in self._get_device_logs()
if case_name in r and (log_name in r or hilog_name in r) and r.endswith('.log')]
if not logs:
return '-'
link = []
for name in sorted(logs):
display_name = ''
if log_name in name:
display_name = log_name
if hilog_name in name:
display_name = hilog_name
link.append("<a href='{}'>{}</a>".format(os.path.join('log', name), display_name))
return '<br>'.join(link)
def _get_device_logs(self):
if self.device_logs is not None:
return self.device_logs
result = []
pth = os.path.join(self.report_path, 'log')
for top, _, nondirs in os.walk(pth):
for filename in nondirs:
if filename.startswith('device_log') or filename.startswith('device_hilog'):
result.append(os.path.join(top, filename).replace(pth, '')[1:])
self.device_logs = result
return result
@classmethod
def _get_suites_title(cls):
suites_title = "<tr>\n" \
" <th class='title' colspan='9'>Test detail</th>\n" \
"</tr>\n" \
"<tr>\n" \
" <th class='normal module'>Module</th>\n" \
" <th class='normal test-suite'>Testsuite</th>\n" \
" <th class='normal total'>Total Tests</th>\n" \
" <th class='normal testsuite'>Testsuite</th>\n" \
" <th class='normal time'>Time(sec)</th>\n" \
" <th class='normal total'>Tests</th>\n" \
" <th class='normal passed'>Passed</th>\n" \
" <th class='normal failed'>Failed</th>\n" \
" <th class='normal blocked'>Blocked</th>\n" \
" <th class='normal ignored'>Ignored</th>\n" \
" <th class='normal time'>Time</th>\n" \
" <th class='normal operate'>Operate</th>\n" \
"</tr>\n"
return suites_title
@ -792,25 +900,37 @@ class VisionHelper:
<span class="return"></span></a>
</th>
</tr>
<tr>
<td class='devicelog' style='border-bottom: 1px #E8F0FD solid;'>DeviceLog:</td>
<td class='normal' colspan='5' style='border-bottom: 1px #E8F0FD solid;'>
<a href='log/device_log_xx.log'>device_log</a> | <a href='log/device_hilog_xx.log'>device_hilog</a>
</td>
</tr>
<tr>
<th class="normal module">Module</th>
<th class="normal test-suite">Testsuite</th>
<th class="normal testsuite">Testsuite</th>
<th class="normal test">Testcase</th>
<th class="normal time">Time</th>
<th class="normal status"><div class="circle-normal
circle-white"></div></th>
<th class="normal time">Time(sec)</th>
<th class="normal status">
<div class="circle-normal circle-white"></div>
</th>
<th class="normal result">Result</th>
<th class='normal logs'>Logs</th>
</tr>
<tr [class="background-color"]>
<td class="normal module">{case.module_name}</td>
<td class="normal test-suite">{case.classname}</td>
<td class="normal test">{case.name}</td>
<td class="normal testsuite">{case.classname}</td>
<td class="normal test">
<a href='{case.report}'>{case.name}</a> or {case.name}
</td>
<td class="normal time">{case.time}</td>
<td class="normal status"><div class="circle-normal
circle-{case.result/status}"></div></td>
<td class="normal result">
[<a href="failures_report.html#{suite.name}.{case.name}">]
{case.result/status}[</a>]</td>
{case.result/status}[</a>]
</td>
<td class='normal logs'>-</td>
</tr>
...
</table>
@ -820,22 +940,17 @@ class VisionHelper:
cases_context = ""
for suite in suites:
# construct case context
module_name = suite.cases[0].module_name if suite.cases else ""
suite_name = getattr(suite, "name", self.PLACE_HOLDER)
case_context = "<table class='test-suite'>\n"
case_context = "{}{}".format(case_context,
self._get_case_title(suite_name))
case_context += self._get_case_title(module_name, suite_name)
for index, case in enumerate(suite.cases):
case_context = "{}{}".format(
case_context,
self._get_case_td_context(index, case, suite_name))
case_context = "%s</table>\n" % case_context
# add case context to cases context
cases_context = "{}{}".format(cases_context, case_context)
case_context += self._get_case_td_context(index, case, suite_name)
case_context += "\n</table>\n"
cases_context += case_context
return file_context.replace(replace_str, cases_context)
@classmethod
def _get_case_td_context(cls, index, case, suite_name):
def _get_case_td_context(self, index, case, suite_name):
result = case.get_result()
rendered_result = result
if result != ReportConstant.passed and \
@ -843,23 +958,34 @@ class VisionHelper:
rendered_result = "<a href='%s#%s.%s'>%s</a>" % \
(ReportConstant.failures_vision_report,
suite_name, case.name, result)
if result == ReportConstant.passed:
rendered_result = "<a href='{}#{}.{}'>{}</a>".format(
ReportConstant.passes_vision_report, suite_name, case.name, result)
if result == ReportConstant.ignored:
rendered_result = "<a href='{}#{}.{}'>{}</a>".format(
ReportConstant.ignores_vision_report, suite_name, case.name, result)
report = case.report
test_name = "<a href='{}'>{}</a>".format(report, case.name) if report else case.name
case_td_context = "<tr>\n" if index % 2 == 0 else \
"<tr class='background-color'>\n"
case_td_context = "{}{}".format(
case_td_context,
" <td class='normal module'>%s</td>\n"
" <td class='normal test-suite'>%s</td>\n"
" <td class='normal testsuite'>%s</td>\n"
" <td class='normal test'>%s</td>\n"
" <td class='normal time'>%s</td>\n"
" <td class='normal status'>"
"<div class='circle-normal circle-%s'></div></td>\n"
" <td class='normal status'>\n"
" <div class='circle-normal circle-%s'></div>\n"
" </td>\n"
" <td class='normal result'>%s</td>\n"
"</tr>\n" % (case.module_name, case.classname, case.name,
case.time, result, rendered_result))
" <td class='normal logs'>%s</td>\n"
"</tr>\n" % (case.module_name, case.classname, test_name,
case.time, result, rendered_result, self._get_testcase_device_log(case.name)))
return case_td_context
@classmethod
def _get_case_title(cls, suite_name):
def _get_case_title(self, module_name, suite_name):
case_title = \
"<tr>\n" \
" <th class='title' colspan='4' id='%s'>\n" \
@ -868,16 +994,18 @@ class VisionHelper:
" <span class='return'></span></a>\n" \
" </th>\n" \
"</tr>\n" \
"%s\n" \
"<tr>\n" \
" <th class='normal module'>Module</th>\n" \
" <th class='normal test-suite'>Testsuite</th>\n" \
" <th class='normal testsuite'>Testsuite</th>\n" \
" <th class='normal test'>Testcase</th>\n" \
" <th class='normal time'>Time</th>\n" \
" <th class='normal time'>Time(sec)</th>\n" \
" <th class='normal status'><div class='circle-normal " \
"circle-white'></div></th>\n" \
" <th class='normal result'>Result</th>\n" \
" <th class='normal logs'>Logs</th>\n" \
"</tr>\n" % (suite_name, suite_name,
ReportConstant.summary_vision_report)
ReportConstant.summary_vision_report, self._get_testsuite_device_log(module_name, suite_name))
return case_title
def _render_failure_cases(self, file_context, suites):
@ -951,32 +1079,228 @@ class VisionHelper:
"{}{}".format(failure_cases_context, case_context)
return file_context.replace(replace_str, failure_cases_context)
def _render_pass_cases(self, file_context, suites):
"""construct pass cases context and render it to file context
failure case table sample:
<table class="pass-test">
<tr>
<th class="title" colspan="4" id="{suite.name}">
<span class="title">{suite.name}&nbsp;&nbsp;</span>
<a href="details_report.html#{suite.name}" or
"summary_report.html#summary">
<span class="return"></span></a>
</th>
</tr>
<tr>
<th class="normal test">Test</th>
<th class="normal status"><div class="circle-normal
circle-white"></div></th>
<th class="normal result">Result</th>
<th class="normal details">Details</th>
</tr>
<tr [class="background-color"]>
<td class="normal test" id="{suite.name}">
{suite.module_name}#{suite.name}</td>
or
<td class="normal test" id="{suite.name}.{case.name}">
{case.module_name}#{case.classname}#{case.name}</td>
<td class="normal status"><div class="circle-normal
circle-{case.result/status}"></div></td>
<td class="normal result">{case.result/status}</td>
<td class="normal details">{case.message}</td>
</tr>
...
</table>
...
"""
file_context = file_context.replace("failure-test", "pass-test")
replace_str = "<!--{failures.context}-->"
pass_cases_context = ""
for suite in suites:
if (suite.result.total > 0 and suite.result.total == (
suite.result.failed + suite.result.ignored + suite.result.blocked)) or \
suite.result.unavailable != 0:
continue
# construct pass cases context for pass suite
suite_name = getattr(suite, "name", self.PLACE_HOLDER)
case_context = "<table class='pass-test'>\n"
case_context = \
"{}{}".format(case_context, self._get_failure_case_title(
suite_name, suite.result.total))
skipped_num = 0
for index, case in enumerate(suite.cases):
result = case.get_result()
if result == ReportConstant.failed or \
result == ReportConstant.ignored or result == ReportConstant.blocked:
skipped_num += 1
continue
case_context = "{}{}".format(
case_context, self._get_pass_case_td_context(
index - skipped_num, case, suite_name, result))
case_context = "{}</table>\n".format(case_context)
# add case context to cases context
pass_cases_context = \
"{}{}".format(pass_cases_context, case_context)
return file_context.replace(replace_str, pass_cases_context)
def _render_ignore_cases(self, file_context, suites):
file_context = file_context.replace("failure-test", "ignore-test")
replace_str = "<!--{failures.context}-->"
ignore_cases_context = ""
for suite in suites:
if (suite.result.total > 0 and suite.result.total == (
suite.result.failed + suite.result.ignored + suite.result.blocked)) or \
suite.result.unavailable != 0:
continue
# construct pass cases context for pass suite
suite_name = getattr(suite, "name", self.PLACE_HOLDER)
case_context = "<table class='ignore-test'>\n"
case_context = \
"{}{}".format(case_context, self._get_failure_case_title(
suite_name, suite.result.total))
skipped_num = 0
for index, case in enumerate(suite.cases):
result = case.get_result()
if result == ReportConstant.failed or \
result == ReportConstant.passed or result == ReportConstant.blocked:
skipped_num += 1
continue
case_context = "{}{}".format(
case_context, self._get_ignore_case_td_context(
index - skipped_num, case, suite_name, result))
case_context = "{}</table>\n".format(case_context)
# add case context to cases context
ignore_cases_context = "{}{}".format(ignore_cases_context, case_context)
return file_context.replace(replace_str, ignore_cases_context)
@classmethod
def _get_pass_case_td_context(cls, index, case, suite_name, result):
pass_case_td_context = "<tr>\n" if index % 2 == 0 else \
"<tr class='background-color'>\n"
test_context = "{}#{}#{}".format(case.module_name, case.classname, case.name)
href_id = "{}.{}".format(suite_name, case.name)
detail_data = "-"
if hasattr(case, "normal_screen_urls"):
detail_data += "Screenshot: {}<br>".format(
cls._get_screenshot_url_context(case.normal_screen_urls))
pass_case_td_context += " <td class='normal test' id='{}'>{}</td>\n" \
" <td class='normal status'>\n" \
" <div class='circle-normal circle-{}'></div>\n" \
" </td>\n" \
" <td class='normal result'>{}</td>\n" \
" <td class='normal details'>\n" \
" {}\n" \
" </td>\n" \
"</tr>\n".format(href_id, test_context, result, result, detail_data)
return pass_case_td_context
@classmethod
def _get_ignore_case_td_context(cls, index, case, suite_name, result):
ignore_case_td_context = "<tr>\n" if index % 2 == 0 else \
"<tr class='background-color'>\n"
test_context = "{}#{}#{}".format(case.module_name, case.classname, case.name)
href_id = "{}.{}".format(suite_name, case.name)
result_info = {}
if hasattr(case, "result_info") and case.result_info:
result_info = json.loads(case.result_info)
detail_data = ""
actual_info = result_info.get("actual", "")
if actual_info:
detail_data += "actual:&nbsp;{}<br>".format(actual_info)
except_info = result_info.get("except", "")
if except_info:
detail_data += "except:&nbsp;{}<br>".format(except_info)
filter_info = result_info.get("filter", "")
if filter_info:
detail_data += "filter:&nbsp;{}<br>".format(filter_info)
if not detail_data:
detail_data = "-"
ignore_case_td_context += " <td class='normal test' id='{}'>{}</td>\n" \
" <td class='normal status'>\n" \
" <div class='circle-normal circle-{}'></div></td>\n" \
" <td class='normal result'>{}</td>\n" \
" <td class='normal details'>\n" \
" {}\n" \
" </td>\n" \
"</tr>\n".format(
href_id, test_context, result, result, detail_data)
return ignore_case_td_context
@classmethod
def _get_screenshot_url_context(cls, url):
context = ""
if not url:
return ""
paths = cls._find_png_file_path(url)
for path in paths:
context += "<br><a href='{0}'>{1}</a>".format(path, path)
return context
@classmethod
def _find_png_file_path(cls, url):
if not url:
return []
last_index = url.rfind("\\")
if last_index < 0:
return []
start_str = url[0:last_index]
end_str = url[last_index + 1:len(url)]
if not os.path.exists(start_str):
return []
paths = []
for file in os.listdir(start_str):
if end_str in file:
whole_path = os.path.join(start_str, file)
l_index = whole_path.rfind("screenshot")
relative_path = whole_path[l_index:]
paths.append(relative_path)
return paths
@classmethod
def _get_failure_case_td_context(cls, index, case, suite_name, result):
failure_case_td_context = "<tr>\n" if index % 2 == 0 else \
"<tr class='background-color'>\n"
if result == ReportConstant.unavailable:
test_context = "%s#%s" % (case.module_name, case.name)
test_context = "{}#{}".format(case.module_name, case.name)
href_id = suite_name
else:
test_context = \
"%s#%s#%s" % (case.module_name, case.classname, case.name)
href_id = "%s.%s" % (suite_name, case.name)
test_context = "{}#{}#{}".format(case.module_name, case.classname, case.name)
href_id = "{}.{}".format(suite_name, case.name)
details_context = case.message
detail_data = ""
if hasattr(case, "normal_screen_urls"):
detail_data += "Screenshot: {}<br>".format(
cls._get_screenshot_url_context(case.normal_screen_urls))
if hasattr(case, "failure_screen_urls"):
detail_data += "Screenshot_On_Failure: {}<br>".format(
cls._get_screenshot_url_context(case.failure_screen_urls))
if details_context:
details_context = str(details_context).replace("<", "&lt;"). \
replace(">", "&gt;").replace("\\r\\n", "<br/>"). \
replace("\\n", "<br/>").replace("\n", "<br/>"). \
detail_data += str(details_context).replace("<", "&lt;"). \
replace(">", "&gt;").replace("\\r\\n", "<br>"). \
replace("\\n", "<br>").replace("\n", "<br>"). \
replace(" ", "&nbsp;")
failure_case_td_context = "{}{}".format(
failure_case_td_context,
" <td class='normal test' id='%s'>%s</td>\n"
" <td class='normal status'>"
"<div class='circle-normal circle-%s'></div></td>\n"
" <td class='normal result'>%s</td>\n"
" <td class='normal details'>%s</td>\n"
"</tr>\n" %
(href_id, test_context, result, result, details_context))
failure_case_td_context += " <td class='normal test' id='{}'>{}</td>\n" \
" <td class='normal status'>" \
" <div class='circle-normal circle-{}'></div>" \
" </td>\n" \
" <td class='normal result'>{}</td>\n" \
" <td class='normal details'>\n" \
" {}" \
" </td>\n" \
"</tr>\n".format(href_id, test_context, result, result, detail_data)
return failure_case_td_context
@classmethod
@ -1022,4 +1346,4 @@ class VisionHelper:
vision_file.write(bytes(report_context, "utf-8", "ignore"))
vision_file.flush()
vision_file.close()
LOG.info("Generate vision report: %s", summary_vision_path)
LOG.info("Generate vision report: file:///%s", summary_vision_path.replace("\\", "/"))

View File

@ -16,6 +16,9 @@
# limitations under the License.
#
import collections
import copy
import json
import os
import platform
import shutil
@ -23,6 +26,7 @@ import time
import zipfile
from importlib import util
from ast import literal_eval
from xml.etree import ElementTree
from _core.interface import IReporter
from _core.plugin import Plugin
@ -31,18 +35,52 @@ from _core.constants import TestType
from _core.constants import FilePermission
from _core.logger import platform_logger
from _core.exception import ParamError
from _core.utils import calculate_elapsed_time
from _core.utils import copy_folder
from _core.utils import get_filename_extension
from _core.report.encrypt import check_pub_key_exist
from _core.report.encrypt import do_rsa_encrypt
from _core.report.encrypt import get_file_summary
from _core.report.reporter_helper import CaseResult
from _core.report.reporter_helper import DataHelper
from _core.report.reporter_helper import ExecInfo
from _core.report.reporter_helper import VisionHelper
from _core.report.reporter_helper import ReportConstant
from _core.report.repeater_helper import RepeatHelper
from xdevice import Variables
LOG = platform_logger("ResultReporter")
class ResultSummary:
def __init__(self):
self.modules = 0
self.runmodules = 0
self.tests = 0
self.passed = 0
self.failed = 0
self.blocked = 0
self.ignored = 0
self.unavailable = 0
def get_data(self):
LOG.info(f"Summary result: modules: {self.modules}, run modules: {self.runmodules}, "
f"total: {self.tests}, passed: {self.passed}, failed: {self.failed}, "
f"blocked: {self.blocked}, ignored: {self.ignored}, unavailable: {self.unavailable}")
data = {
"modules": self.modules,
"runmodules": self.runmodules,
"tests": self.tests,
"passed": self.passed,
"failed": self.failed,
"blocked": self.blocked,
"ignored": self.ignored,
"unavailable": self.unavailable
}
return data
@Plugin(type=Plugin.REPORTER, id=TestType.all)
class ResultReporter(IReporter):
summary_report_result = []
@ -56,6 +94,13 @@ class ResultReporter(IReporter):
self.parsed_data = None
self.data_helper = None
self.vision_helper = None
self.repeat_helper = None
self.summary = ResultSummary()
# task_record.info数据
self._failed_cases = []
self.record_params = {}
self.record_reports = {}
def __generate_reports__(self, report_path, **kwargs):
LOG.info("")
@ -72,7 +117,7 @@ class ResultReporter(IReporter):
self._generate_vision_reports()
# generate task info record
self._generate_task_info_record()
self._generate_task_record()
# generate summary ini
self._generate_summary()
@ -107,8 +152,218 @@ class ResultReporter(IReporter):
self.exec_info = task_info
self.data_helper = DataHelper()
self.vision_helper = VisionHelper()
self.vision_helper.report_path = report_path
self.repeat_helper = RepeatHelper(report_path)
return True
def _generate_test_report(self):
report_template = os.path.join(Variables.res_dir, "template")
copy_folder(report_template, self.report_path)
content = json.dumps(self._get_summary_data())
data_js = os.path.join(self.report_path, "static", "data.js")
data_fd = os.open(data_js, os.O_CREAT | os.O_WRONLY, FilePermission.mode_644)
with os.fdopen(data_fd, mode="w", encoding="utf-8") as jsf:
jsf.write(f"window.reportData = {content}")
test_report = os.path.join(self.report_path, ReportConstant.summary_vision_report).replace("\\", "/")
LOG.info(f"Log path: {self.report_path}")
LOG.info(f"Generate test report: file:///{test_report}")
# 重新生成对象避免在retry场景数据统计有误
self.summary = ResultSummary()
def _get_summary_data(self):
modules = []
for data_report, _ in self.data_reports:
if data_report.endswith(ReportConstant.summary_data_report):
continue
info = self._parse_module(data_report)
if info is not None:
modules.append(info)
if self.summary.failed != 0 or self.summary.blocked != 0 or self.summary.unavailable != 0:
from xdevice import Scheduler
Scheduler.is_need_auto_retry = True
data = {
"exec_info": self._get_exec_info(),
"summary": self.summary.get_data(),
"modules": modules,
}
return data
def _get_exec_info(self):
start_time = self.task_info.test_time
end_time = time.strftime(ReportConstant.time_format, time.localtime())
test_time = "%s/ %s" % (start_time, end_time)
execute_time = calculate_elapsed_time(
time.mktime(time.strptime(start_time, ReportConstant.time_format)),
time.mktime(time.strptime(end_time, ReportConstant.time_format)))
host_info = platform.platform()
device_name = getattr(self.task_info, ReportConstant.device_name, "None")
device_type = getattr(self.task_info, ReportConstant.device_label, "None")
platform_info = getattr(self.task_info, ReportConstant.platform, "None")
test_type = getattr(self.task_info, ReportConstant.test_type, "None")
# 为报告文件summary.ini提供数据
exec_info = ExecInfo()
exec_info.device_name = device_name
exec_info.device_label = device_type
exec_info.execute_time = execute_time
exec_info.host_info = host_info
exec_info.log_path = self.report_path
exec_info.platform = platform_info
exec_info.test_time = test_time
exec_info.test_type = test_type
self.exec_info = exec_info
info = {
"platform": platform_info,
"test_type": test_type,
"device_name": device_name,
"device_type": device_type,
"test_time": test_time,
"execute_time": execute_time,
"host_info": host_info
}
return info
def _parse_module(self, xml_file):
"""解析测试模块"""
file_name = os.path.basename(xml_file)
try:
ele_module = ElementTree.parse(xml_file).getroot()
except ElementTree.ParseError:
LOG.error(f"parse module result error, result file {file_name}")
return None
module = ResultReporter._count_result(ele_module)
module_name = file_name[:-4] if module.name == "" else module.name
suites = [self._parse_testsuite(ele_suite) for ele_suite in ele_module]
# 为报告文件task_record.info提供数据
self.record_reports.update({module_name: xml_file})
if len(self._failed_cases) != 0:
self.record_params.update({module_name: copy.copy(self._failed_cases)})
self._failed_cases.clear()
self.summary.modules += 1
self.summary.tests += module.tests
self.summary.passed += module.passed
self.summary.failed += module.failed
self.summary.blocked += module.blocked
self.summary.ignored += module.ignored
if module.unavailable == 0:
self.summary.runmodules += 1
else:
self.summary.unavailable += 1
module_report, module_time = module.report, module.time
if len(suites) == 1 and suites[0].get(ReportConstant.name) == module_name:
report = suites[0].get(ReportConstant.report)
if report != "":
module_report = report
module_time = suites[0].get(ReportConstant.time)
info = {
"name": module_name,
"report": module_report,
"time": module_time,
"execute_time": calculate_elapsed_time(0, module_time),
"tests": module.tests,
"passed": module.passed,
"failed": module.failed,
"blocked": module.blocked,
"ignored": module.ignored,
"unavailable": module.unavailable,
"passingrate": "0%" if module.tests == 0 else "{:.0%}".format(module.passed / module.tests),
"productinfo": ResultReporter._parse_product_info(ele_module),
"suites": suites
}
return info
def _parse_testsuite(self, ele_suite):
"""解析测试套"""
suite = ResultReporter._count_result(ele_suite)
cases = [self._parse_testcase(case) for case in ele_suite]
info = {
"name": suite.name,
"report": suite.report,
"time": suite.time,
"tests": suite.tests,
"passed": suite.passed,
"failed": suite.failed,
"blocked": suite.blocked,
"ignored": suite.ignored,
"passingrate": "0%" if suite.tests == 0 else "{:.0%}".format(suite.passed / suite.tests),
"cases": cases
}
return info
def _parse_testcase(self, ele_case):
"""解析测试用例"""
name = ele_case.get(ReportConstant.name)
class_name = ele_case.get(ReportConstant.class_name, "")
result = ResultReporter._get_case_result(ele_case)
if result != CaseResult.passed:
self._failed_cases.append(f"{class_name}#{name}")
return [name, class_name, result, ResultReporter._parse_time(ele_case),
ele_case.get(ReportConstant.message, ""), ele_case.get(ReportConstant.report, "")]
@staticmethod
def _parse_time(ele):
try:
_time = float(ele.get(ReportConstant.time, "0"))
except ValueError:
_time = 0.0
LOG.error("parse test time error, set it to 0.0")
return _time
@staticmethod
def _parse_product_info(ele_module):
product_info = ele_module.get(ReportConstant.product_info, "")
if product_info == "":
return {}
try:
return literal_eval(product_info)
except SyntaxError:
return {}
@staticmethod
def _count_result(ele):
name = ele.get(ReportConstant.name, "")
report = ele.get(ReportConstant.report, "")
_time = ResultReporter._parse_time(ele)
tests = int(ele.get(ReportConstant.tests, "0"))
failed = int(ele.get(ReportConstant.failures, "0"))
disabled = ele.get(ReportConstant.disabled, "0")
if disabled == "":
disabled = "0"
errors = ele.get(ReportConstant.errors, "0")
if errors == "":
errors = "0"
blocked = int(disabled) + int(errors)
ignored = int(ele.get(ReportConstant.ignored, "0"))
unavailable = int(ele.get(ReportConstant.unavailable, "0"))
tmp_pass = tests - failed - blocked - ignored
passed = tmp_pass if tmp_pass > 0 else 0
Result = collections.namedtuple(
'Result',
['name', 'report', 'time', 'tests', 'passed', 'failed', 'blocked', 'ignored', 'unavailable'])
return Result(name, report, _time, tests, passed, failed, blocked, ignored, unavailable)
def _get_case_result(ele_case):
result_kind = ele_case.get(ReportConstant.result_kind, "")
if result_kind != "":
return result_kind
result = ele_case.get(ReportConstant.result, "")
status = ele_case.get(ReportConstant.status, "")
if result == ReportConstant.false and (status == ReportConstant.run or status == ""):
return CaseResult.failed
if status in [ReportConstant.blocked, ReportConstant.disabled, ReportConstant.error]:
return CaseResult.blocked
if status in [ReportConstant.skip, ReportConstant.not_run]:
return CaseResult.ignored
if status in [ReportConstant.unavailable]:
return CaseResult.unavailable
return CaseResult.passed
def _generate_data_report(self):
# initial element
test_suites_element = self.data_helper.initial_suites_element()
@ -145,7 +400,9 @@ class ResultReporter(IReporter):
if module_name == ReportConstant.empty_name:
module_name = self._get_module_name(data_report, root)
total = int(root.get(ReportConstant.tests, 0))
modules[module_name] = modules.get(module_name, 0) + total
if module_name not in modules.keys():
modules[module_name] = list()
modules[module_name].append(total)
self._append_product_info(test_suites_attributes, root)
for child in root:
@ -177,14 +434,7 @@ class ResultReporter(IReporter):
return False
# set test suites element attributes and children
modules_zero = [module_name for module_name, total in modules.items()
if total == 0]
if modules_zero:
LOG.info("The total tests of %s module is 0", ",".join(
modules_zero))
test_suites_attributes[ReportConstant.run_modules] = \
len(modules) - len(modules_zero)
test_suites_attributes[ReportConstant.modules] = len(modules)
self._handle_module_tests(modules, test_suites_attributes)
self.data_helper.set_element_attributes(test_suites_element,
test_suites_attributes)
test_suites_element.extend(test_suite_elements)
@ -299,7 +549,8 @@ class ResultReporter(IReporter):
summary.result.ignored, summary.result.unavailable)
LOG.info("Log path: %s", self.exec_info.log_path)
if summary.result.failed != 0 or summary.result.blocked != 0 or summary.result.unavailable != 0:
if summary.result.failed != 0 or summary.result.blocked != 0 or\
summary.result.unavailable != 0:
from xdevice import Scheduler
Scheduler.is_need_auto_retry = True
@ -322,6 +573,16 @@ class ResultReporter(IReporter):
parsed_data, ReportConstant.failures_title,
ReportConstant.failures_vision_report)
# generate passes vision report
if summary.result.passed != 0:
self._generate_vision_report(
parsed_data, ReportConstant.passes_title, ReportConstant.passes_vision_report)
# generate ignores vision report
if summary.result.ignored != 0:
self._generate_vision_report(
parsed_data, ReportConstant.ignores_title, ReportConstant.ignores_vision_report)
def _generate_vision_report(self, parsed_data, title, render_target):
# render data
@ -395,15 +656,18 @@ class ResultReporter(IReporter):
return
summary_ini_content = \
"[default]\n" \
"Platform=%s\n" \
"Test Type=%s\n" \
"Device Name=%s\n" \
"Host Info=%s\n" \
"Test Start/ End Time=%s\n" \
"Execution Time=%s\n" % (
"Platform={}\n" \
"Test Type={}\n" \
"Device Name={}\n" \
"Host Info={}\n" \
"Test Start/ End Time={}\n" \
"Execution Time={}\n" \
"Device Type={}\n".format(
self.exec_info.platform, self.exec_info.test_type,
self.exec_info.device_name, self.exec_info.host_info,
self.exec_info.test_time, self.exec_info.execute_time)
self.exec_info.test_time, self.exec_info.execute_time,
self.exec_info.device_label)
if self.exec_info.product_info:
for key, value in self.exec_info.product_info.items():
summary_ini_content = "{}{}".format(
@ -436,13 +700,13 @@ class ResultReporter(IReporter):
file_handler.write(bytes(summary_ini_content, 'utf-8'))
file_handler.flush()
LOG.info("Generate summary ini: %s", summary_filepath)
self.repeat_helper.__generate_repeat_xml__(self.summary_data_path)
def _copy_report(self):
from xdevice import Scheduler
if Scheduler.upload_address or self._check_mode(ModeType.decc):
return
from xdevice import Variables
dst_path = os.path.join(Variables.temp_dir, "latest")
try:
shutil.rmtree(dst_path, ignore_errors=True)
@ -461,11 +725,11 @@ class ResultReporter(IReporter):
def _compress_report_folder(self):
if self._check_mode(ModeType.decc) or \
self._check_mode(ModeType.factory):
return
return None
if not os.path.isdir(self.report_path):
LOG.error("'%s' is not folder!" % self.report_path)
return
return None
# get file path list
file_path_list = []
@ -510,7 +774,7 @@ class ResultReporter(IReporter):
from xdevice import Scheduler
return Scheduler.mode == mode
def _generate_task_info_record(self):
def _generate_task_record(self):
# under encryption status, don't handle anything directly
if check_pub_key_exist() and not self._check_mode(ModeType.decc):
return
@ -521,18 +785,22 @@ class ResultReporter(IReporter):
return
_, command, report_path = Scheduler.command_queue[-1]
# handle parsed data
record = self._parse_record_from_data(command, report_path)
record_info = {
"command": command,
"session_id": os.path.split(report_path)[-1],
"report_path": report_path,
"unsuccessful_params": self.record_params,
"data_reports": self.record_reports
}
def encode(content):
# inner function to encode
return ' '.join([bin(ord(c)).replace('0b', '') for c in content])
# write into file
import json
record_file = os.path.join(self.report_path,
ReportConstant.task_info_record)
_record_json = json.dumps(record, indent=2)
_record_json = json.dumps(record_info, indent=2)
with open(file=record_file, mode="wb") as file:
if Scheduler.mode == ModeType.decc:
@ -579,7 +847,7 @@ class ResultReporter(IReporter):
data_reports = dict()
if self._check_mode(ModeType.decc):
from xdevice import SuiteReporter
for module_name, report_path, report_result in \
for module_name, report_path, _ in \
SuiteReporter.get_history_result_list():
if module_name in module_set:
data_reports.update({module_name: report_path})
@ -600,8 +868,10 @@ class ResultReporter(IReporter):
return ()
def decode(content):
return ''.join([chr(i) for i in [int(b, 2) for b in
content.split(' ')]])
result_list = []
for b in content.split(' '):
result_list.append(chr(int(b, 2)))
return ''.join(result_list)
record_path = os.path.join(history_path,
ReportConstant.task_info_record)
@ -609,7 +879,6 @@ class ResultReporter(IReporter):
LOG.error("%s not exists!", ReportConstant.task_info_record)
return ()
import json
from xdevice import Scheduler
with open(record_path, mode="rb") as file:
if Scheduler.mode == ModeType.decc:
@ -634,6 +903,7 @@ class ResultReporter(IReporter):
def get_result_of_summary_report(cls):
if cls.summary_report_result:
return cls.summary_report_result[0][1]
return None
@classmethod
def summary_report_result_exists(cls):
@ -643,6 +913,7 @@ class ResultReporter(IReporter):
def get_path_of_summary_report(cls):
if cls.summary_report_result:
return cls.summary_report_result[0][0]
return None
@classmethod
def _write_long_size_file(cls, zip_object, long_size_file):
@ -658,12 +929,10 @@ class ResultReporter(IReporter):
shutil.copyfileobj(src, des, 1024 * 1024 * 8)
def _transact_all(self):
from xdevice import Variables
tools_dir = os.path.join(Variables.res_dir, "tools", "binder.pyc")
if not os.path.exists(tools_dir):
pyc_path = os.path.join(Variables.res_dir, "tools", "binder.pyc")
if not os.path.exists(pyc_path):
return
module_spec = util.spec_from_file_location(
"binder", tools_dir)
module_spec = util.spec_from_file_location("binder", pyc_path)
if not module_spec:
return
module = util.module_from_spec(module_spec)
@ -671,3 +940,19 @@ class ResultReporter(IReporter):
if hasattr(module, "transact") and callable(module.transact):
module.transact(self, LOG)
del module
@classmethod
def _handle_module_tests(cls, modules, test_suites_attributes):
modules_list = list()
modules_zero = list()
for module_name, detail_list in modules.items():
for total in detail_list:
modules_list.append(total)
if total == 0:
modules_zero.append(module_name)
test_suites_attributes[ReportConstant.run_modules] = \
len(modules_list) - len(modules_zero)
test_suites_attributes[ReportConstant.modules] = len(modules_list)
if modules_zero:
LOG.info("The total tests of %s module is 0", ",".join(
modules_zero))

View File

@ -73,7 +73,7 @@ class JsonParser:
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(path_or_content, flags, modes),
"r") as file_content:
"r", encoding="utf-8") as file_content:
json_content = json.load(file_content)
except (TypeError, ValueError, AttributeError) as error:
raise ParamError("json file error: %s %s" % (

View File

@ -18,6 +18,8 @@
import copy
import os
import re
import shutil
import socket
import sys
import time
@ -115,8 +117,6 @@ def get_decode(stream):
def is_proc_running(pid, name=None):
if hasattr(sys, ConfigConst.env_pool_cache) and getattr(sys, ConfigConst.env_pool_cache, False):
return True
if platform.system() == "Windows":
pid = "{}.exe".format(pid)
proc_sub = subprocess.Popen(["C:\\Windows\\System32\\tasklist"],
@ -395,6 +395,7 @@ def modify_props(device, local_prop_file, target_prop_file, new_props):
def get_device_log_file(report_path, serial=None, log_name="device_log",
device_name="", module_name=None):
from xdevice import Variables
# new a module folder to save log
if module_name:
log_path = os.path.join(report_path, Variables.report_vars.log_dir, module_name)
else:
@ -412,7 +413,7 @@ def get_device_log_file(report_path, serial=None, log_name="device_log",
def check_result_report(report_root_dir, report_file, error_message="",
report_name="", module_name=""):
report_name="", module_name="", **kwargs):
"""
Check whether report_file exits or not. If report_file is not exist,
create empty report with error_message under report_root_dir
@ -440,8 +441,9 @@ def check_result_report(report_root_dir, report_file, error_message="",
suite_result.stacktrace = error_message
if module_name:
suite_name = module_name
suite_reporter = SuiteReporter([(suite_result, [])], suite_name,
result_dir, modulename=module_name)
suite_reporter = SuiteReporter(
[(suite_result, [])], suite_name, result_dir, modulename=module_name,
is_monkey=kwargs.get("is_monkey", False), device_up_info=kwargs.get("device_up_info", None))
suite_reporter.create_empty_report()
return "%s.xml" % os.path.join(result_dir, suite_name)
@ -472,26 +474,6 @@ def is_python_satisfied():
return False
def get_version():
from xdevice import Variables
ver = ''
ver_file_path = os.path.join(Variables.res_dir, 'version.txt')
if not os.path.isfile(ver_file_path):
return ver
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(ver_file_path, flags, modes),
"rb") as ver_file:
content_list = ver_file.read().decode("utf-8").split("\n")
for line in content_list:
if line.strip() and "-v" in line:
ver = line.strip().split('-')[1]
ver = ver.split(':')[0][1:]
break
return ver
def get_instance_name(instance):
return instance.__class__.__name__
@ -525,6 +507,23 @@ def convert_serial(serial):
serial[0:length], "*" * (len(serial) - length * 2), serial[-length:])
def convert_mac(message):
if isinstance(message, list):
return message
pattern = r'.+\'hcptest\':\'(.+)\''
pattern2 = r'.+pass_through:.+\'hcptest\':\'(.+)\''
result1 = re.match(pattern, message)
result2 = re.search(pattern2, message)
if result1 or result2:
result = result1 if result1 else result2
result = result.group(1)
length = len(result) // 8
convert_mes = "{}{}{}".format(result[0:length], "*" * (len(result) - length * 2), result[-length:])
return message.replace(result, convert_mes)
else:
return message
def get_shell_handler(request, parser_type):
suite_name = request.root.source.test_name
parsers = get_plugin(Plugin.PARSER, parser_type)
@ -699,40 +698,112 @@ def do_module_kit_teardown(request):
setattr(device, ConfigConst.module_kits, [])
def get_current_time():
current_time = time.time()
local_time = time.localtime(current_time)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return "%s" % (data_head)
def check_mode_in_sys(mode):
if not hasattr(sys, "mode"):
return False
return getattr(sys, "mode") == mode
def get_cst_time():
cn_tz = timezone(timedelta(hours=8),
name='Asia/ShangHai')
return datetime.now(tz=cn_tz)
sh_tz = timezone(
timedelta(hours=8),
name='Asia/Shanghai',
)
return datetime.now(tz=sh_tz)
def get_delta_time_ms(start_time):
end_time = get_cst_time()
delta = round(float((end_time - start_time).total_seconds()) * 1000, 3)
delta = (end_time - start_time).total_seconds() * 1000
return delta
def get_device_proc_pid(device, proc_name, double_check=False):
def get_netstat_proc_pid(device, port):
if not hasattr(device, "execute_shell_command") or \
not hasattr(device, "log") or \
not hasattr(device, "get_recover_state"):
return ""
if not device.get_recover_state():
return ""
cmd = 'ps -ef | grep %s' % proc_name
cmd = 'netstat -anp | grep {}'.format(port)
proc_running = device.execute_shell_command(cmd).strip()
proc_running = proc_running.split("\n")
for data in proc_running:
if proc_name in data and "grep" not in data:
device.log.debug('{} running status:{}'.format(proc_name, data))
if str(port) in data and "grep" not in data:
data = data.split()
return data[1]
if double_check:
cmd = 'ps -A | grep %s' % proc_name
proc_running = device.execute_shell_command(cmd).strip()
proc_running = proc_running.split("\n")
for data in proc_running:
if proc_name in data:
device.log.debug('{} running status double_check:{}'.format(proc_name, data))
data = data.split()
return data[0]
data = data[len(data) - 1]
device.log.debug('{} proc:{}'.format(port, data))
data = data.split("/")
return data[0]
return ""
def calculate_elapsed_time(begin, end):
"""计算时间间隔
Args:
begin: int/datetime, begin time
end : int/datetime, end time
Returns:
elapsed time description
"""
elapsed = []
# 传入datetime对象
if isinstance(begin, datetime) and isinstance(end, datetime):
total_seconds = (end - begin).total_seconds()
# 传入耗时秒数
else:
total_seconds = end - begin
total_seconds = float(round(total_seconds, 3))
seconds = int(total_seconds)
if seconds < 0:
return f"calculate error, total seconds is {total_seconds}"
if seconds == 0:
milliseconds = int((total_seconds - seconds) * 1000)
if milliseconds > 0:
return "{} ms".format(milliseconds)
else:
return "0 second"
d, s = divmod(seconds, 24 * 60 * 60)
if d == 1:
elapsed.append("1 day")
if d > 1:
elapsed.append("{} days".format(d))
h, s = divmod(s, 60 * 60)
if h == 1:
elapsed.append("1 hour")
if h > 1:
elapsed.append("{} hours".format(h))
m, s = divmod(s, 60)
if m == 1:
elapsed.append("1 minute")
if m > 1:
elapsed.append("{} minutes".format(m))
if s == 1:
elapsed.append("1 second")
if s > 1:
elapsed.append("{} seconds".format(s))
return " ".join(elapsed)
def copy_folder(src, dst):
if not os.path.exists(src):
LOG.error(f"copy folder error, source path '{src}' does not exist")
return
if not os.path.exists(dst):
os.makedirs(dst)
for filename in os.listdir(src):
fr_path = os.path.join(src, filename)
to_path = os.path.join(dst, filename)
if os.path.isfile(fr_path):
shutil.copy(fr_path, to_path)
if os.path.isdir(fr_path):
os.makedirs(to_path)
copy_folder(fr_path, to_path)