This commit is contained in:
deveco_xdevice 2022-11-18 17:35:03 +08:00
commit 244d7dcf5a
3 changed files with 133 additions and 33 deletions

View File

@ -206,7 +206,7 @@ class CppTestDriver(IDriver):
command = "./%s" % self.execute_bin
report_path = "/%s/%s/" % ("reports", self.execute_bin.split(".")[0])
self.config.device_xml_path = (self.linux_directory + report_path).\
self.config.device_xml_path = (self.linux_directory + report_path). \
replace("//", "/")
self.config.device_report_path = execute_dir + report_path
@ -232,7 +232,7 @@ class CppTestDriver(IDriver):
self.rerun = True
timeout_config = get_config_value('timeout',
json_config.get_driver(), False)
json_config.get_driver(), False)
if timeout_config:
self.config.timeout = int(timeout_config) // 1000
else:
@ -619,6 +619,9 @@ class CTestDriver(IDriver):
def __init__(self):
self.file_name = ""
self.run_third = False
self.kit_type = None
self.auto_deploy = None
def __check_environment__(self, device_options):
if len(device_options) != 1 or \
@ -649,19 +652,23 @@ class CTestDriver(IDriver):
kit_instances = get_kit_instances(json_config,
request.config.resource_path,
request.config.testcases_path)
for (kit_instance, kit_info) in zip(kit_instances,
json_config.get_kits()):
if isinstance(kit_instance, DeployToolKit):
LOG.debug("Run ctest third party")
self._run_ctest_third_party(source=source, request=request,
time_out=int(
kit_instance.timeout))
break
for (_, kit_info) in zip(kit_instances, json_config.get_kits()):
self.auto_deploy = kit_info.get("auto_deploy", "")
self.kit_type = kit_info.get("type", "")
if self.kit_type == CKit.deploytool:
self.run_third = True
LOG.info("Kit type:{}".format(self.kit_type))
LOG.info("Auto deploy:{}".format(self.auto_deploy))
LOG.info("Run third:{}".format(self.run_third))
if self.run_third:
LOG.info("Run the third-party vendor part")
if self.auto_deploy in ["False", "flase"]:
self._run_ctest_third_party(source=source, request=request)
else:
LOG.debug("Run ctest")
self._run_ctest(source=source, request=request,
timeout=int(kit_instance.timeout))
break
self._run_ctest_upgrade_party(source=source, request=request)
else:
LOG.debug("Run ctest")
self._run_ctest(source=source, request=request)
except (LiteDeviceExecuteCommandError, Exception) as exception:
LOG.error(exception, error_no=getattr(exception, "error_no",
@ -777,6 +784,52 @@ class CTestDriver(IDriver):
self.config.device.device.com_dict.get(
ComType.deploy_com).close()
def _run_ctest_upgrade_party(self, source=None, request=None, timeout=90):
parser_instances = []
parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
try:
if not source:
LOG.error("Error: source don't exist %s." % source,
error_no="00101")
return
version = get_test_component_version(self.config)
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suites_name = self.file_name
parser_instance.product_info.setdefault("Version", version)
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
result = self._reset_third_device(request, source)
self.result = "%s.xml" % os.path.join(
request.config.report_path, "result", self.file_name)
if isinstance(result, list):
self.config.device.device.com_dict.get(
ComType.deploy_com).connect()
LOG.debug("Device com:{}".format(self.config.device.device))
result, _, error = self.config.device.device. \
execute_command_with_timeout(
command=request, case_type=DeviceTestType.ctest_lite,
key=ComType.deploy_com, timeout=timeout, receiver=handler)
else:
handler.__read__(result)
handler.__done__()
device_log_file = get_device_log_file(request.config.report_path,
request.config.device.
__get_serial__())
device_log_file_open = \
os.open(device_log_file, os.O_WRONLY | os.O_CREAT |
os.O_APPEND, FilePermission.mode_755)
with os.fdopen(device_log_file_open, "a") as file_name:
file_name.write("{}{}".format(
"\n".join(result.split("\n")[0:-1]), "\n"))
file_name.flush()
finally:
self.config.device.device.com_dict.get(
ComType.deploy_com).close()
def _reset_device(self, request, source):
json_config = JsonParser(source)
reset_cmd = []
@ -800,6 +853,27 @@ class CTestDriver(IDriver):
reset_cmd = [int(item, 16) for item in reset_cmd]
return reset_cmd
def _reset_third_device(self, request, source):
json_config = JsonParser(source)
reset_cmd = []
kit_instances = get_kit_instances(json_config,
request.config.resource_path,
request.config.testcases_path)
from xdevice import Scheduler
for (kit_instance, kit_info) in zip(kit_instances,
json_config.get_kits()):
if not isinstance(kit_instance, DeployToolKit):
continue
if not self.file_name:
self.file_name = get_config_value(
'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
if not Scheduler.is_execute:
raise ExecuteTerminate("ExecuteTerminate",
error_no="00300")
reset_cmd = kit_instance.__setup__(
self.config.device)
return reset_cmd
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -500,11 +500,10 @@ class ComController:
"""
self.is_open = False
self.com = None
self.serial_port = device.get("com") if device.get("com") else None
self.baud_rate = int(device.get("baud_rate")) if device.get(
"baud_rate") else DEFAULT_BAUD_RATE
self.timeout = int(device.get("timeout")) if device.get(
"timeout") else TIMEOUT
self.serial_port = device.get("com", None)
self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE))
self.timeout = int(device.get("timeout", TIMEOUT))
self.usb_port = device.get("usb_port", None)
def connect(self):
"""

View File

@ -25,7 +25,7 @@ import shutil
import platform
import glob
import time
import sys
from xdevice import Plugin
from xdevice import platform_logger
from xdevice import DeviceAllocationState
@ -193,12 +193,12 @@ class MountKit(ITestKit):
liteos_commands = ["cd /", "umount device_directory",
"mount nfs_ip:nfs_directory device"
"_directory nfs"]
linux_commands = ["cd /%s" % "storage",
"umount -f /%s/%s" % ("storage", "device_directory"),
linux_commands = ["cd /{}".format("storage"),
"umount -f /{}/{}".format("storage", "device_directory"),
"toybox mount -t nfs -o nolock,addr=nfs_ip nfs_ip:nfs_directory "
"/%s/%s" % ("storage", "device_directory"),
"chmod 755 -R /%s/%s" % (
"storage", "device_directory")]
"/{}/{}".format("storage", "device_directory"),
"chmod 755 -R /{}/{}".format(
"storage", "device_directory")]
if not linux_host or not linux_directory:
raise LiteDeviceMountError(
"nfs server miss ip or directory[00108]", error_no="00108")
@ -701,28 +701,55 @@ class DeployToolKit(ITestKit):
self.config = None
self.auto_deploy = None
self.device_label = None
self.timeout = None
self.time_out = None
self.paths = None
self.upgrade_file_path = None
self.burn_tools = None
def __check_config__(self, config):
self.config = config
self.paths = get_config_value("paths", config)
self.burn_file = get_config_value("burn_file", config, is_list=False)
self.auto_deploy = get_config_value('auto_deploy',
config, is_list=False)
self.device_label = get_config_value("device_label", config,
is_list=False)
self.timeout = get_config_value("time_out", config,
self.time_out = get_config_value("timeout", config,
is_list=False)
self.upgrade_file_path = get_config_value("upgrade_file_path", config, is_list=False)
self.burn_tools = get_config_value("burn_tools", config, is_list=False)
if not self.auto_deploy or not self.device_label or not self.time_out:
if not self.auto_deploy or not self.upgrade_file_path or not self.time_out:
msg = "The config for deploy tool kit is" \
"invalid: device label :{} time out:{}".format(
self.device_label, self.time_out)
"invalid: upgrade_file_path :{} time out:{}".format(
self.upgrade_file_path, self.time_out)
LOG.error(msg, error_no="00108")
return TypeError(msg)
def __setup__(self, device, **kwargs):
args = kwargs
request = args.get("request", None)
request.config.deploy_tool_kit = self.config
LOG.info("Upgrade file path:{}".format(self.upgrade_file_path))
upgrade_file_name = os.path.basename(self.upgrade_file_path)
if self.upgrade_file_path.startswith("resource"):
self.upgrade_file_path = get_file_absolute_path(
os.path.join("tools", upgrade_file_name), self.paths)
sys.path.insert(0, os.path.dirname(self.upgrade_file_path))
serial_port = device.device.com_dict.get(ComType.deploy_com).serial_port
LOG.debug("Serial port:{}".format(serial_port))
baud_rate = device.device.com_dict.get(ComType.deploy_com).baud_rate
usb_port = device.device.com_dict.get(ComType.cmd_com).usb_port
patch_file = get_file_absolute_path(self.burn_file, self.paths)
upgrade_name = upgrade_file_name.split(".py")[0]
import_cmd_str = "from {} import {} as upgrade_device".format(
upgrade_name, upgrade_name)
scope = {}
exec(import_cmd_str, scope)
upgrade_device = scope.get("upgrade_device", "None")
upgrade = upgrade_device(serial_port=serial_port, baud_rate=baud_rate,
patch_file=patch_file, usb_port=usb_port)
upgrade_result = upgrade.burn()
if upgrade_result:
return upgrade.reset_device()
return None
def __teardown__(self, device):
pass