mirror of
https://gitee.com/openharmony/testfwk_xdevice
synced 2024-11-27 01:20:53 +00:00
commit
40979290ca
@ -218,8 +218,7 @@ class Device(IDevice):
|
||||
stdout = self.execute_shell_command(command, timeout=5 * 1000,
|
||||
output_flag=False,
|
||||
retry=retry,
|
||||
abort_on_exception=
|
||||
abort_on_exception).strip()
|
||||
abort_on_exception=abort_on_exception).strip()
|
||||
if stdout:
|
||||
LOG.debug(stdout)
|
||||
return stdout
|
||||
@ -726,9 +725,9 @@ class Device(IDevice):
|
||||
except Exception as _:
|
||||
time.sleep(3)
|
||||
self._proxy.init(port=self._h_port, addr=self.host, device=self)
|
||||
|
||||
if self._uitestdeamon is not None:
|
||||
self._uitestdeamon.init(self)
|
||||
finally:
|
||||
if self._uitestdeamon is not None:
|
||||
self._uitestdeamon.init(self)
|
||||
|
||||
if self._proxy:
|
||||
return self._proxy
|
||||
@ -781,6 +780,7 @@ class Device(IDevice):
|
||||
@summary: 截取手机屏幕图片并保存
|
||||
@param name: 保存的图片名称,通过getTakePicturePath方法获取保存全路径
|
||||
'''
|
||||
path = ""
|
||||
try:
|
||||
temp_path = os.path.join(self._device_log_path, "temp")
|
||||
path = os.path.join(temp_path, name)
|
||||
@ -789,7 +789,6 @@ class Device(IDevice):
|
||||
self.pull_file("/data/screen.png", path)
|
||||
except Exception as error:
|
||||
self.log.error("devicetest take_picture: {}".format(str(error)))
|
||||
|
||||
return path
|
||||
|
||||
def set_device_report_path(self, path):
|
||||
|
@ -47,6 +47,7 @@ RETRY_ATTEMPTS = 0
|
||||
HDC = "litehdc.exe"
|
||||
DEFAULT_BAUD_RATE = 115200
|
||||
|
||||
|
||||
def get_hdc_path():
|
||||
from xdevice import Variables
|
||||
user_path = os.path.join(Variables.exec_dir, "resource/tools")
|
||||
|
@ -41,6 +41,7 @@ from xdevice import modify_props
|
||||
from xdevice import get_app_name_by_tool
|
||||
from xdevice import remount
|
||||
from xdevice import disable_keyguard
|
||||
from xdevice import get_class
|
||||
|
||||
from ohos.constants import CKit
|
||||
from ohos.environment.dmlib import CollectingOutputReceiver
|
||||
@ -953,7 +954,7 @@ def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
|
||||
ret_str.append(prefix_char + " ".join(['notTestFile',
|
||||
exclude_file]))
|
||||
elif para_name.strip() == "test" or para_name.strip() == "class":
|
||||
result = _get_class(junit_paras, prefix_char, para_name.strip())
|
||||
result = get_class(junit_paras, prefix_char, para_name.strip())
|
||||
ret_str.append(result)
|
||||
elif para_name.strip() == "include-annotation":
|
||||
ret_str.append(prefix_char + " ".join(
|
||||
@ -968,40 +969,6 @@ def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
|
||||
return " ".join(ret_str)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _get_class(junit_paras, prefix_char, para_name):
|
||||
if not junit_paras.get(para_name):
|
||||
return ""
|
||||
|
||||
result = ""
|
||||
if prefix_char == "-e":
|
||||
result = " %s class " % prefix_char
|
||||
elif prefix_char == "--":
|
||||
result = " %sclass " % prefix_char
|
||||
elif prefix_char == "-s":
|
||||
result = " %s class " % prefix_char
|
||||
test_items = []
|
||||
for test in junit_paras.get(para_name):
|
||||
test_item = test.split("#")
|
||||
if len(test_item) == 1 or len(test_item) == 2:
|
||||
test_item = "%s" % test
|
||||
test_items.append(test_item)
|
||||
elif len(test_item) == 3:
|
||||
test_item = "%s#%s" % (test_item[1], test_item[2])
|
||||
test_items.append(test_item)
|
||||
else:
|
||||
raise ParamError("The parameter %s %s is error" % (
|
||||
prefix_char, para_name))
|
||||
if not result:
|
||||
LOG.debug("There is unsolved prefix char: %s ." % prefix_char)
|
||||
return result + ",".join(test_items)
|
||||
|
||||
|
||||
def get_app_name(hap_app):
|
||||
hap_name = os.path.basename(hap_app).replace(".hap", "")
|
||||
app_name = ""
|
||||
|
@ -77,6 +77,7 @@ from _core.testkit.kit import remount
|
||||
from _core.testkit.kit import disable_keyguard
|
||||
from _core.testkit.kit import unlock_screen
|
||||
from _core.testkit.kit import unlock_device
|
||||
from _core.testkit.kit import get_class
|
||||
from _core.driver.parser_lite import ShellHandler
|
||||
from _core.report.encrypt import check_pub_key_exist
|
||||
from _core.utils import get_file_absolute_path
|
||||
@ -195,6 +196,7 @@ __all__ = [
|
||||
"disable_keyguard",
|
||||
"unlock_screen",
|
||||
"unlock_device",
|
||||
"get_class",
|
||||
"ShellHandler",
|
||||
"ResultCode",
|
||||
"check_pub_key_exist",
|
||||
|
@ -386,7 +386,7 @@ class Console(object):
|
||||
Scheduler.command_queue.append(args)
|
||||
LOG.info("Input command: {}".format(args))
|
||||
para_list = args.split()
|
||||
argument = self.argument_parser( para_list)
|
||||
argument = self.argument_parser(para_list)
|
||||
if argument.options is None or not argument.valid_param:
|
||||
LOG.warning("Options is None.")
|
||||
return None
|
||||
|
@ -271,7 +271,7 @@ class DriversThread(threading.Thread):
|
||||
failed_list.append(i + "#" + i)
|
||||
else:
|
||||
failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
|
||||
except:
|
||||
except Exception:
|
||||
failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
|
||||
if not failed_list:
|
||||
failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], [])
|
||||
|
@ -144,12 +144,7 @@ class Scheduler(object):
|
||||
self.test_number = len(task.test_drivers)
|
||||
|
||||
if task.config.exectype == TestExecType.device_test:
|
||||
if not hasattr(task.config, "dry_run") or \
|
||||
not task.config.dry_run or \
|
||||
(task.config.dry_run and task.config.retry):
|
||||
self._device_test_execute(task)
|
||||
else:
|
||||
self._dry_run_device_test_execute(task)
|
||||
self._device_test_execute(task)
|
||||
elif task.config.exectype == TestExecType.host_test:
|
||||
self._host_test_execute(task)
|
||||
else:
|
||||
|
@ -39,7 +39,7 @@ TARGET_SDK_VERSION = 22
|
||||
|
||||
__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
|
||||
"get_install_args", "reset_junit_para", "remount", "disable_keyguard",
|
||||
"timeout_callback", "unlock_screen", "unlock_device"]
|
||||
"timeout_callback", "unlock_screen", "unlock_device", "get_class"]
|
||||
|
||||
|
||||
def remount(device):
|
||||
@ -55,7 +55,7 @@ def remount(device):
|
||||
device.execute_shell_command("mount -o rw,remount /%s" % "system")
|
||||
|
||||
|
||||
def _get_class(junit_paras, prefix_char, para_name):
|
||||
def get_class(junit_paras, prefix_char, para_name):
|
||||
if not junit_paras.get(para_name):
|
||||
return ""
|
||||
|
||||
@ -122,7 +122,7 @@ def junit_para_parse(device, junit_paras, prefix_char="-e"):
|
||||
ret_str.append(" ".join([prefix_char, 'notTestFile',
|
||||
exclude_file]))
|
||||
elif para_name.strip() == "test" or para_name.strip() == "class":
|
||||
result = _get_class(junit_paras, prefix_char, para_name.strip())
|
||||
result = get_class(junit_paras, prefix_char, para_name.strip())
|
||||
ret_str.append(result)
|
||||
elif para_name.strip() == "include-annotation":
|
||||
ret_str.append(" ".join([prefix_char, "annotation",
|
||||
|
@ -687,24 +687,4 @@ def do_module_kit_teardown(request):
|
||||
for kit in getattr(device, ConfigConst.module_kits, []):
|
||||
if check_device_name(device, kit, step="teardown"):
|
||||
kit.__teardown__(device)
|
||||
setattr(device, ConfigConst.module_kits, [])
|
||||
|
||||
|
||||
def get_version_for_setup():
|
||||
relative_path = "resource/version.txt"
|
||||
parent_dir = os.path.abspath(os.path.dirname(__file__))
|
||||
version_file = os.path.normpath(
|
||||
os.path.join(parent_dir, relative_path))
|
||||
ver = "0.0.0"
|
||||
if os.path.isfile(version_file):
|
||||
flags = os.O_RDONLY
|
||||
modes = stat.S_IWUSR | stat.S_IRUSR
|
||||
with os.fdopen(os.open(version_file, flags, modes),
|
||||
"rb") as ver_file:
|
||||
content_list = ver_file.read().decode("utf-8").split("\n")
|
||||
for line in content_list:
|
||||
if line.strip() and "-v" in line:
|
||||
ver = line.strip().split('-')[1]
|
||||
ver = ver.split(':')[0][1:]
|
||||
break
|
||||
return ver
|
||||
setattr(device, ConfigConst.module_kits, [])
|
Loading…
Reference in New Issue
Block a user