修复通过querySmall.bin获取认证设备信息可能包含设备日志的问题;修改user_config.xml格式问题

Signed-off-by: deveco_xdevice <liguangjie1@huawei.com>
This commit is contained in:
deveco_xdevice 2024-11-26 11:51:47 +08:00
parent e0f46bf452
commit df3f92a141
2 changed files with 131 additions and 83 deletions

View File

@ -64,7 +64,7 @@
<web_resource>
<enable>FALSE</enable>
<url></url>
</web_resource>>
</web_resource>
</resource>
<devicelog>
<enable>ON</enable>

View File

@ -33,16 +33,15 @@ from xdevice import ParamError
from xdevice import ITestKit
from xdevice import get_config_value
from xdevice import get_file_absolute_path
from xdevice import ConfigConst
from xdevice import get_local_ip
from xdevice import FilePermission
from xdevice import DeviceTestType
from xdevice import DeviceLabelType
from xdevice import JsonParser
from ohos.config.config_manager import OHOSUserConfigManager
from ohos.constants import ComType
from ohos.constants import CKit
from ohos.constants import DeviceLiteKernel
from ohos.drivers.constants import get_nfs_server
from ohos.error import ErrorMessage
from ohos.exception import LiteDeviceConnectError
from ohos.exception import LiteDeviceError
@ -55,7 +54,81 @@ LOG = platform_logger("KitLite")
RESET_CMD = "0xEF, 0xBE, 0xAD, 0xDE, 0x0C, 0x00, 0x87, 0x78, 0x00, 0x00, 0x61, 0x94"
def execute_query(device, query):
def copy_file_to_nfs(nfs_config, src):
ip = nfs_config.get("ip")
port = nfs_config.get("port")
nfs_dir = nfs_config.get("dir")
username = nfs_config.get("username")
password = nfs_config.get("password")
is_remote = nfs_config.get("remote", "false")
dst_file = os.path.join(nfs_dir, os.path.basename(src))
if is_remote.lower() == "true":
# copy to remote
client = None
try:
LOG.info(f"Trying to copy the file from {src} to nfs server")
import paramiko
client = paramiko.Transport(ip, int(port))
client.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.put(localpath=src, remotepath=dst_file)
except OSError as e:
LOG.warning(f"Copy file to nfs server failed, {e}")
finally:
if client is not None:
client.close()
else:
# copy to local
for count in range(1, 4):
LOG.info(f"Trying to copy the file from {src} to nfs server")
try:
os.remove(dst_file)
except (FileNotFoundError, IOError, PermissionError):
pass
try:
shutil.copy(src, nfs_dir)
except (FileNotFoundError, IOError, PermissionError) as e:
LOG.warning(f"Copy file to nfs server failed, {e}")
if check_server_file(src, nfs_dir):
break
LOG.info(f"Trying to copy the file from {src} to nfs server {count} times")
if count == 3:
LOG.error("Copy file to nfs server failed after retry")
LOG.debug("Nfs server: {}".format(glob.glob(os.path.join(nfs_dir, '*.*'))))
def get_file_from_nfs(nfs_config, src, dst):
ip = nfs_config.get("ip")
port = nfs_config.get("port")
username = nfs_config.get("username")
password = nfs_config.get("password")
is_remote = nfs_config.get("remote", "false")
try:
if is_remote.lower() == "true":
import paramiko
client = None
try:
client = paramiko.Transport(ip, int(port))
client.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.get(remotepath=src, localpath=dst)
finally:
if client is not None:
client.close()
else:
if os.path.exists(dst):
os.remove(dst)
if os.path.exists(src):
shutil.copy(src, dst)
except (FileNotFoundError, IOError) as e:
LOG.error(f"Get file from nfs server failed, {e}")
def execute_query(device, query, request):
if not query:
LOG.debug("query bin is none")
return
@ -63,13 +136,44 @@ def execute_query(device, query):
LOG.debug("query bin has been executed")
return
LOG.debug("execute query bin begins")
commands = ["cd /"]
if device.__get_device_kernel__() == DeviceLiteKernel.linux_kernel:
command = f"chmod +x /storage{query} && /storage{query}"
# query_pth, /storage/test_tool/tools/querySmall.bin
query_pth = f"/storage{query}"
query_dir = os.path.dirname(query_pth)
query_bin = os.path.basename(query_pth)
commands.append(f"chmod +x {query_pth}")
commands.append(f"cd {query_dir}")
commands.append("ls")
commands.append(f"./{query_bin}")
commands.append("ls")
else:
command = f".{query}"
output, _, _ = device.execute_command_with_timeout(command=command, timeout=10)
LOG.debug(output)
params = parse_strings_key_value(output)
# query, /test_tool/tools/querySmall.bin
query_dir = os.path.dirname(query)
query_bin = os.path.basename(query)
commands.append(f"cd {query_dir}")
commands.append("ls")
commands.append(f"./{query_bin}")
commands.append("ls")
content = ""
for command in commands:
output, _, _ = device.execute_command_with_timeout(command=command, timeout=10)
LOG.debug(output)
# 取bin文件的执行输出
if query_bin in command:
content = output
nfs_config = get_nfs_server(request)
src = os.path.join(nfs_config.get("dir"), "querySmall.txt")
dst = os.path.join(request.config.report_path, "log", "querySmall.txt")
get_file_from_nfs(nfs_config, src, dst)
if os.path.exists(dst):
LOG.debug("querySmall.txt exists")
with open(dst, encoding="utf-8") as query_result_f:
content = query_result_f.read()
os.remove(dst)
params = parse_strings_key_value(content)
device.update_device_props(params)
LOG.debug("execute query bin ends")
@ -180,7 +284,7 @@ class MountKit(ITestKit):
LOG.error(msg, error_no="00108")
raise TypeError(ErrorMessage.Config.Code_0302017)
def mount_on_board(self, device=None, remote_info=None, case_type=""):
def mount_on_board(self, device=None, remote_info=None, case_type="", request=None):
"""
Init the environment on the device server, e.g. mount the testcases to
server
@ -204,8 +308,7 @@ class MountKit(ITestKit):
linux_directory = remote_info.get("dir", "")
is_remote = remote_info.get("remote", "false")
liteos_commands = ["cd /", "umount device_directory",
"mount nfs_ip:nfs_directory device"
"_directory nfs"]
"mount nfs_ip:nfs_directory device_directory nfs"]
linux_commands = ["cd /{}".format("storage"),
"umount -f /{}/{}".format("storage", "device_directory"),
"toybox mount -t nfs -o nolock,addr=nfs_ip nfs_ip:nfs_directory "
@ -273,7 +376,7 @@ class MountKit(ITestKit):
result, status, _ = device.execute_command_with_timeout(
command=command, case_type=case_type, timeout=timeout)
LOG.info('Prepare environment success')
execute_query(device, '/test_root/tools/querySmall.bin')
execute_query(device, '/test_root/tools/querySmall.bin', request)
def __setup__(self, device, **kwargs):
"""
@ -287,23 +390,13 @@ class MountKit(ITestKit):
self.check_hcp_mode(request)
device.connect()
config_manager = OHOSUserConfigManager(
config_file=request.get(ConfigConst.configfile, ""),
env=request.get(ConfigConst.test_environment, ""))
remote_info = config_manager.get_user_config("testcases/server",
filter_name=self.server)
copy_list = self.copy_to_server(remote_info.get("dir"),
remote_info.get("ip"),
request, request.config.testcases_path)
remote_info = get_nfs_server(request)
copy_list = self.copy_to_server(remote_info, request.config.testcases_path)
self.mount_on_board(device=device, remote_info=remote_info,
case_type=DeviceTestType.cpp_test_lite)
case_type=DeviceTestType.cpp_test_lite, request=request)
return copy_list
def copy_to_server(self, linux_directory, linux_host, request,
testcases_dir):
def copy_to_server(self, remote_info, testcases_dir):
file_local_paths = []
# find querySmall.bin
query_small_src = "resource/tools/querySmall.bin"
@ -314,10 +407,16 @@ class MountKit(ITestKit):
except ParamError:
LOG.debug("query bin is not found")
is_query_small_copy_before = False
for mount_file in self.mount_list:
source = mount_file.get("source")
if not source:
raise TypeError(ErrorMessage.Config.Code_0302005)
if source == query_small_src:
is_query_small_copy_before = True
if is_query_small_copy_before:
LOG.debug("query bin has been copy to nfs server")
continue
source = source.replace("$testcases/", "").replace("$resources/", "")
file_path = get_file_absolute_path(source, self.paths)
if os.path.isdir(file_path):
@ -329,69 +428,18 @@ class MountKit(ITestKit):
else:
file_local_paths.append(file_path)
config_manager = OHOSUserConfigManager(
config_file=request.get(ConfigConst.configfile, ""),
env=request.get(ConfigConst.test_environment, ""))
remote_info = config_manager.get_user_config("testcases/server",
filter_name=self.server)
self.remote_info = remote_info
ip = linux_host = remote_info.get("ip", "")
port = remote_info.get("port", "")
remote_dir = linux_directory = remote_info.get("dir", "")
if not remote_info:
err_msg = ErrorMessage.Config.Code_0302022.format(self.remote)
LOG.error(err_msg)
raise TypeError(err_msg)
is_remote = remote_info.get("remote", "false")
if (str(get_local_ip()) == linux_host) and (
linux_directory == ("/data%s" % testcases_dir)):
return []
ip = remote_info.get("ip", "")
port = remote_info.get("port", "")
remote_dir = remote_info.get("dir", "")
if not ip or not port or not remote_dir:
LOG.warning("Nfs server's ip or port or dir is empty")
return []
for _file in file_local_paths:
# remote copy
if not is_remote.lower() == "false":
try:
import paramiko
client = paramiko.Transport(ip, int(port))
client.connect(username=remote_info.get("username"),
password=remote_info.get("password"))
sftp = paramiko.SFTPClient.from_transport(client)
sftp.put(localpath=_file, remotepath=os.path.join(
remote_info.get("dir"), os.path.basename(_file)))
LOG.info("Trying to copy the file from {} to nfs server".
format(_file))
client.close()
except OSError as exception:
msg = "copy file to nfs server failed with error {}" \
.format(exception)
LOG.error(msg, error_no="00403")
# local copy
else:
for count in range(1, 4):
try:
os.remove(os.path.join(remote_info.get("dir"),
os.path.basename(_file)))
except OSError as _:
pass
LOG.info("Trying to copy the file from {} to nfs server".
format(_file))
shutil.copy(_file, remote_info.get("dir"))
if check_server_file(_file, remote_info.get("dir")):
break
else:
LOG.info(
"Trying to copy the file from {} to nfs "
"server {} times".format(_file, count))
if count == 3:
msg = "Copy {} to nfs server failed {} times".format(
os.path.basename(_file), count)
LOG.error(msg, error_no="00403")
LOG.debug("Nfs server:{}".format(glob.glob(
os.path.join(remote_info.get("dir"), '*.*'))))
copy_file_to_nfs(remote_info, _file)
self.file_name_list.append(os.path.basename(_file))
return self.file_name_list
@ -605,7 +653,7 @@ class QueryKit(ITestKit):
if not request:
raise ParamError(ErrorMessage.Config.Code_0302010)
self.mount_kit.__setup__(device, request=request)
execute_query(device, self.query)
execute_query(device, self.query, request)
def __teardown__(self, device):
if device.label != DeviceLabelType.ipcamera: