update OpenHarmony 2.0 Canary

This commit is contained in:
mamingshuai 2021-06-02 00:45:03 +08:00
parent fe14c87b60
commit b1da5a73b4
70 changed files with 7303 additions and 232 deletions

15
.gitattributes vendored Normal file
View File

@ -0,0 +1,15 @@
*.tgz filter=lfs diff=lfs merge=lfs -text
*.trp filter=lfs diff=lfs merge=lfs -text
*.apk filter=lfs diff=lfs merge=lfs -text
*.jar filter=lfs diff=lfs merge=lfs -text
*.mp4 filter=lfs diff=lfs merge=lfs -text
*.zip filter=lfs diff=lfs merge=lfs -text
*.asm filter=lfs diff=lfs merge=lfs -text
*.8svn filter=lfs diff=lfs merge=lfs -text
*.9svn filter=lfs diff=lfs merge=lfs -text
*.dylib filter=lfs diff=lfs merge=lfs -text
*.exe filter=lfs diff=lfs merge=lfs -text
*.a filter=lfs diff=lfs merge=lfs -text
*.so filter=lfs diff=lfs merge=lfs -text
*.bin filter=lfs diff=lfs merge=lfs -text
*.dll filter=lfs diff=lfs merge=lfs -text

View File

@ -1,11 +0,0 @@
### 该问题是怎么引起的?
### 重现步骤
### 报错信息

View File

@ -1,12 +0,0 @@
### 相关的Issue
### 原因(目的、解决的问题等)
### 描述(做了什么,变更了什么)
### 测试用例(新增、改动、可能影响的功能)

1
.gitignore vendored
View File

@ -1,2 +1 @@
*.pyc
/extension/

View File

@ -1,17 +1,17 @@
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//test/xts/tools/lite/build/suite_lite.gni")
deploy_suite("xdevice") {
suite_name = "acts,hits,ssts"
}
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//test/xts/tools/lite/build/suite_lite.gni")
deploy_suite("xdevice") {
suite_name = "acts,hits,ssts"
}

View File

@ -20,7 +20,6 @@ XDevice consists of the following sub-modules:
- **environment**: configures the test framework environment, enabling device discovery and device management.
- **testkit**: provides test tools to implement JSON parsing, network file mounting, etc.
- **resource**: provides the device connection configuration file and report template definitions.
- **adapter**: adapts the test framework to open-source software.
## Directory Structure<a name="section1791423143211"></a>
@ -117,10 +116,10 @@ The environment requirements for using this module are as follows:
```
help:
Use help to get information.
use help to get information.
usage:
run: Display a list of supported run commands.
list: Display a list of supported devices and task records.
run: Display a list of supported run command.
list: Display a list of supported device and task record.
Examples:
help run
help list
@ -136,15 +135,15 @@ The environment requirements for using this module are as follows:
```
list:
Display device list and task records.
This command is used to display device list and task record.
usage:
list
list history
list <id>
Introduction:
list: Display the device list.
list history: Display historical records of a series of tasks.
list <id>: Display historical records of tasks with the specified IDs.
list: display device list
list history: display history record of a serial of tasks
list <id>: display history record about task what contains specific id
Examples:
list
list history
@ -162,8 +161,8 @@ The environment requirements for using this module are as follows:
```
run:
Execute the selected test cases.
The command execution process includes use case compilation, execution, and result collection.
This command is used to execute the selected testcases.
It includes a series of processes such as use case compilation, execution, and result collection.
usage: run [-l TESTLIST [TESTLIST ...] | -tf TESTFILE
[TESTFILE ...]] [-tc TESTCASE] [-c CONFIG] [-sn DEVICE_SN]
[-rp REPORT_PATH [REPORT_PATH ...]]
@ -179,8 +178,8 @@ The environment requirements for using this module are as follows:
action task
Specify tests to run.
positional arguments:
action Specify the action to do.
task Specify the task name, such as ssts, acts, and hits.
action Specify action
task Specify task name,such as "ssts", "acts", "hits"
```
>![](figures/icon-note.gif) **NOTE:**
@ -207,15 +206,15 @@ The environment requirements for using this module are as follows:
Structure of the report directory (the default or the specified one)
├── result # Test case execution results of the module
│ ├── module name.xml
│ ├── ...
│ ├── ... ...
├── log # Running logs of devices and tasks
│ ├── device 1.log
│ ├── ...
│ ├── ... ...
│ ├── task.log
├── summary_report.html # Visual report
├── summary_report.html # Statistical report
└── ...
└── ... ...
```

1
README_zh.md Executable file → Normal file
View File

@ -20,7 +20,6 @@ xdevice主要包括以下几个主要模块
- environment测试框架的环境配置模块提供设备发现设备管理的功能。
- testkit测试框架工具模块提供json解析网络文件挂载等操作。
- resource测试框架资源模块提供设备连接配置文件和报告模板定义。
- adapter测试框架适配开源软件的模块。
## 目录<a name="section1791423143211"></a>

View File

@ -1,16 +0,0 @@
{
"description": "Config for hits test suites",
"kits": [
{
"type": "QueryKit",
"server": "NfsServer",
"mount": [
{
"source": "resource/tools/query.bin",
"target": "/test_root/tools"
}
],
"query" : "/test_root/tools/query.bin"
}
]
}

129
config/user_config.xml Executable file → Normal file
View File

@ -1,66 +1,63 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<device type="com"
label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device type="com"
label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com"
label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir></dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
<loglevel>INFO</loglevel>
</user_config>
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<device type="com" label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device type="com" label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com" label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir></dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
<loglevel>INFO</loglevel>
</user_config>

66
extension/setup.py Normal file
View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from setuptools import setup
INSTALL_REQUIRES = [
]
def main():
setup(name='xdevice-extension',
description='xdevice extension test framework',
url='',
package_dir={'': 'src'},
packages=['xdevice_extension',
'xdevice_extension._core',
'xdevice_extension._core.driver',
'xdevice_extension._core.environment',
'xdevice_extension._core.executor',
'xdevice_extension._core.testkit'
],
package_data={
},
entry_points={
'driver': [
'drivers=xdevice_extension._core.driver.drivers',
'kunpeng=xdevice_extension._core.driver.kunpeng'
],
'parser': [
'parser=xdevice_extension._core.driver.parser'
],
'device': [
'default=xdevice_extension._core.environment.device'
],
'manager': [
'device=xdevice_extension._core.environment.manager_device'
],
'listener': [
'listener=xdevice_extension._core.executor.listener'
],
'testkit': [
'kit=xdevice_extension._core.testkit.kit'
]
},
zip_safe=False,
install_requires=INSTALL_REQUIRES,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from ._core.driver.drivers import RemoteTestRunner
from ._core.driver.drivers import RemoteDexRunner
__all__ = [
"RemoteTestRunner",
"RemoteDexRunner"
]

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,249 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
__all__ = ["DeviceOsType", "ProductForm", "TestType", "TestExecType",
"DeviceTestType", "HostTestType", "HostDrivenTestType",
"SchedulerType", "ListenerType", "ToolCommandType",
"TEST_DRIVER_SET", "LogType", "ParserType", "CKit", "ComType",
"DeviceLabelType", "DeviceLiteKernel", "GTestConst", "ManagerType",
"CommonParserType", "DeviceConnectorType", "FilePermission"]
@dataclass
class DeviceOsType(object):
"""
DeviceOsType enumeration
"""
default = "default"
lite = "lite"
@dataclass
class DeviceConnectorType:
hdc = "usb-hdc"
@dataclass
class ProductForm(object):
"""
ProductForm enumeration
"""
phone = "phone"
car = "ivi"
television = "tv"
watch = "watch"
tablet = 'tablet'
@dataclass
class TestType(object):
"""
TestType enumeration
"""
unittest = "unittest"
mst = "moduletest"
systemtest = "systemtest"
perf = "performance"
sec = "security"
reli = "reliability"
dst = "distributedtest"
all = "ALL"
@dataclass
class ComType(object):
"""
ComType enumeration
"""
cmd_com = "cmd"
deploy_com = "deploy"
@dataclass
class DeviceLabelType(object):
"""
DeviceLabelType enumeration
"""
wifiiot = "wifiiot"
ipcamera = "ipcamera"
watch = "watch"
phone = "phone"
@dataclass
class DeviceLiteKernel(object):
"""
Lite device os enumeration
"""
linux_kernel = "linux"
lite_kernel = "lite"
TEST_TYPE_DICT = {
"UT": TestType.unittest,
"MST": TestType.mst,
"ST": TestType.systemtest,
"PERF": TestType.perf,
"SEC": TestType.sec,
"RELI": TestType.reli,
"DST": TestType.dst,
"ALL": TestType.all,
}
@dataclass
class TestExecType(object):
"""
TestExecType enumeration according to test execution method
"""
# A test running on the device
device_test = "device"
# A test running on the host (pc)
host_test = "host"
# A test running on the host that interacts with one or more devices.
host_driven_test = "hostdriven"
@dataclass
class DeviceTestType(object):
"""
DeviceTestType enumeration
"""
cpp_test = "CppTest"
dex_test = "DexTest"
dex_junit_test = "DexJUnitTest"
hap_test = "HapTest"
junit_test = "JUnitTest"
jsunit_test = "JSUnitTest"
ctest_lite = "CTestLite"
cpp_test_lite = "CppTestLite"
lite_cpp_test = "LiteUnitTest"
open_source_test = "OpenSourceTest"
build_only_test = "BuildOnlyTestLite"
@dataclass
class HostTestType(object):
"""
HostTestType enumeration
"""
host_gtest = "HostGTest"
host_junit_test = "HostJUnitTest"
@dataclass
class HostDrivenTestType(object):
"""
HostDrivenType enumeration
"""
device_test = "DeviceTest"
TEST_DRIVER_SET = {
DeviceTestType.cpp_test,
DeviceTestType.dex_test,
DeviceTestType.hap_test,
DeviceTestType.junit_test,
DeviceTestType.dex_junit_test,
DeviceTestType.jsunit_test,
DeviceTestType.cpp_test_lite,
DeviceTestType.ctest_lite,
DeviceTestType.lite_cpp_test,
HostDrivenTestType.device_test
}
@dataclass
class SchedulerType(object):
"""
SchedulerType enumeration
"""
# default scheduler
scheduler = "Scheduler"
@dataclass
class LogType:
tool = "Tool"
device = "Device"
@dataclass
class ListenerType:
log = "Log"
report = "Report"
upload = "Upload"
collect = "Collect"
collect_lite = "CollectLite"
@dataclass
class ParserType:
ctest_lite = "CTestLite"
cpp_test_lite = "CppTestLite"
cpp_test_list_lite = "CppTestListLite"
open_source_test = "OpenSourceTest"
build_only_test = "BuildOnlyTestLite"
@dataclass
class CommonParserType:
jsunit = "JSUnit"
cpptest = "CppTest"
cpptest_list = "CppTestList"
junit = "JUnit"
@dataclass
class ManagerType:
device = "device"
lite_device = "device_lite"
@dataclass
class ToolCommandType(object):
toolcmd_key_help = "help"
toolcmd_key_show = "show"
toolcmd_key_run = "run"
toolcmd_key_quit = "quit"
toolcmd_key_list = "list"
@dataclass
class CKit:
push = "PushKit"
command = "CommandKit"
config = "ConfigKit"
wifi = "WIFIKit"
propertycheck = 'PropertyCheckKit'
sts = 'STSKit'
shell = "ShellKit"
testbundle = "TestBundleKit"
appinstall = "AppInstallKit"
@dataclass
class GTestConst(object):
exec_para_filter = "--gtest_filter"
exec_para_level = "--gtest_testsize"
class FilePermission(object):
mode_777 = 0o777
mode_755 = 0o755
mode_644 = 0o644

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,104 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import platform
import subprocess
import sys
import uuid
from xdevice import platform_logger
from xdevice import IDriver
from xdevice import Plugin
from xdevice_extension._core.utils import get_decode
KUNPENG_TEST = "KunpengTest"
LOG = platform_logger(KUNPENG_TEST)
@Plugin(type=Plugin.DRIVER, id=KUNPENG_TEST)
class KunpengTest(IDriver):
"""
KunpengTest is a Test that runs a host-driven test on given kunpeng
servers.
"""
# test driver config
config = None
result = ""
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __execute__(self, request):
self.config = request.config
# 1.parse config file
mainconfig_file = request.get_config_file()
if not mainconfig_file:
LOG.error("config file not exists")
return
LOG.debug("KunpengTest mainconfig_file FilePath: %s" % mainconfig_file)
# 2.set params
tmp_id = str(uuid.uuid4())
tmp_folder = os.path.join(self.config.report_path, "temp")
self.config.tmp_sub_folder = os.path.join(tmp_folder, "task_" + tmp_id)
os.makedirs(self.config.tmp_sub_folder, exist_ok=True)
# 3.test execution
self._start_kunpengtest_with_cmd(mainconfig_file)
return
def _start_kunpengtest_with_cmd(self, mainconfig_file):
from xdevice import Variables
# insert& _kunpengtest path for loading kunpengtest module
kunpengtest_module = os.path.join(Variables.modules_dir,
"_kunpengtest")
sys.path.insert(1, kunpengtest_module)
cmd_parts = []
if platform.system() == "Windows":
cmd_parts.append("python")
else:
cmd_parts.append("python3")
relative_path = "uniautos/src/Framework/Dev/bin/UniAutosScript.py"
cmd_parts.append(os.path.abspath(os.path.join(kunpengtest_module,
relative_path)))
cmd_parts.append("-c")
cmd_parts.append(mainconfig_file)
cmd_parts.append("-rp")
cmd_parts.append(self.config.tmp_sub_folder)
cmd = " ".join(cmd_parts)
LOG.info("start kunpengtest with cmd: %s" % cmd)
try:
proc = subprocess.Popen(cmd_parts, shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(out, _) = proc.communicate()
out = get_decode(out).strip()
for line in out.split("\n"):
LOG.info(line)
except (subprocess.CalledProcessError, FileNotFoundError) as error:
LOG.error("kunpeng test error: %s" % error)
def __result__(self):
return self.result if os.path.exists(self.result) else ""

View File

@ -0,0 +1,766 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import re
import time
import datetime
from enum import Enum
from xdevice import LifeCycle
from xdevice import IParser
from xdevice import platform_logger
from xdevice import Plugin
from xdevice import check_pub_key_exist
from xdevice import StateRecorder
from xdevice import TestDescription
from xdevice import ResultCode
from xdevice_extension._core.constants import CommonParserType
__all__ = ["CppTestParser", "CppTestListParser",
"JunitParser", "JSUnitParser"]
_INFORMATIONAL_MARKER = "[----------]"
_START_TEST_RUN_MARKER = "[==========] Running"
_TEST_RUN_MARKER = "[==========]"
_GTEST_DRYRUN_MARKER = "Running main() "
_START_TEST_MARKER = "[ RUN ]"
_OK_TEST_MARKER = "[ OK ]"
_SKIPPED_TEST_MARKER = "[ SKIPPED ]"
_FAILED_TEST_MARKER = "[ FAILED ]"
_ALT_OK_MARKER = "[ OK ]"
_TIMEOUT_MARKER = "[ TIMEOUT ]"
_START_JSUNIT_RUN_MARKER = "[start] start run suites"
_START_JSUNIT_SUITE_RUN_MARKER = "[suite start]"
_START_JSUNIT_SUITE_END_MARKER = "[suite end]"
_END_JSUNIT_RUN_MARKER = "[end] run suites end"
_PASS_JSUNIT_MARKER = "[pass]"
_FAIL_JSUNIT_MARKER = "[fail]"
_ACE_LOG_MARKER = "app Log"
LOG = platform_logger("Parser")
@Plugin(type=Plugin.PARSER, id=CommonParserType.cpptest)
class CppTestParser(IParser):
def __init__(self):
self.state_machine = StateRecorder()
self.suite_name = ""
self.listeners = []
self.product_info = {}
self.is_params = False
def get_suite_name(self):
return self.suite_name
def get_listeners(self):
return self.listeners
def __process__(self, lines):
if not self.state_machine.suites_is_started():
self.state_machine.trace_logs.extend(lines)
for line in lines:
if not check_pub_key_exist():
LOG.debug(line)
self.parse(line)
def __done__(self):
suite_result = self.state_machine.get_suites()
if not suite_result.suites_name:
return
for listener in self.get_listeners():
suites = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuites, test_result=suites,
suites_name=suites.suites_name,
product_info=suites.product_info)
self.state_machine.current_suites = None
def parse(self, line):
if self.state_machine.suites_is_started() or line.startswith(
_TEST_RUN_MARKER):
if line.startswith(_START_TEST_RUN_MARKER):
message = line[len(_TEST_RUN_MARKER):].strip()
self.handle_suites_started_tag(message)
elif line.startswith(_INFORMATIONAL_MARKER):
pattern = r"(.*) (\(\d+ ms total\))"
message = line[len(_INFORMATIONAL_MARKER):].strip()
if re.match(pattern, line.strip()):
self.handle_suite_ended_tag(message)
elif re.match(r'(\d+) test[s]? from (.*)', message):
self.handle_suite_started_tag(message)
elif line.startswith(_TEST_RUN_MARKER):
if not self.state_machine.suites_is_running():
return
message = line[len(_TEST_RUN_MARKER):].strip()
self.handle_suites_ended_tag(message)
elif line.startswith(_START_TEST_MARKER):
# Individual test started
message = line[len(_START_TEST_MARKER):].strip()
self.handle_test_started_tag(message)
else:
self.process_test(line)
def process_test(self, line):
if _SKIPPED_TEST_MARKER in line:
message = line[line.index(_SKIPPED_TEST_MARKER) + len(
_SKIPPED_TEST_MARKER):].strip()
if not self.state_machine.test_is_running():
LOG.error(
"Found {} without {} before, wrong GTest log format".
format(line, _START_TEST_MARKER))
return
self.handle_test_ended_tag(message, ResultCode.SKIPPED)
elif _OK_TEST_MARKER in line:
message = line[line.index(_OK_TEST_MARKER) + len(
_OK_TEST_MARKER):].strip()
if not self.state_machine.test_is_running():
LOG.error(
"Found {} without {} before, wrong GTest log format".
format(line, _START_TEST_MARKER))
return
self.handle_test_ended_tag(message, ResultCode.PASSED)
elif _ALT_OK_MARKER in line:
message = line[line.index(_ALT_OK_MARKER) + len(
_ALT_OK_MARKER):].strip()
self.fake_run_marker(message)
self.handle_test_ended_tag(message, ResultCode.PASSED)
elif _FAILED_TEST_MARKER in line:
message = line[line.index(_FAILED_TEST_MARKER) + len(
_FAILED_TEST_MARKER):].strip()
if not self.state_machine.suite_is_running():
return
if not self.state_machine.test_is_running():
self.fake_run_marker(message)
self.handle_test_ended_tag(message, ResultCode.FAILED)
elif _TIMEOUT_MARKER in line:
message = line[line.index(_TIMEOUT_MARKER) + len(
_TIMEOUT_MARKER):].strip()
self.fake_run_marker(message)
self.handle_test_ended_tag(message, ResultCode.FAILED)
elif self.state_machine.test_is_running():
self.append_test_output(line)
def handle_test_suite_failed(self, error_msg):
error_msg = "Unknown error" if error_msg is None else error_msg
LOG.info("Test run failed: {}".format(error_msg))
if self.state_machine.test_is_running():
self.state_machine.test().is_completed = True
for listener in self.get_listeners():
test_result = copy.copy(self.currentTestResult)
listener.__failed__(LifeCycle.TestCase, test_result)
listener.__ended__(LifeCycle.TestCase, test_result)
self.state_machine.suite().stacktrace = error_msg
self.state_machine.suite().is_completed = True
for listener in self.get_listeners():
suite_result = copy.copy(self.currentSuiteResult)
listener.__failed__(LifeCycle.TestSuite, suite_result)
listener.__ended__(LifeCycle.TestSuite, suite_result)
def handle_test_started_tag(self, message):
test_class, test_name, _ = self.parse_test_description(
message)
test_result = self.state_machine.test(reset=True)
test_result.test_class = test_class
test_result.test_name = test_name
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__started__(LifeCycle.TestCase, test_result)
@classmethod
def parse_test_description(cls, message):
run_time = 0
matcher = re.match(r'(.*) \((\d+) ms\)', message)
if matcher:
test_class, test_name = matcher.group(1).rsplit(".", 1)
run_time = int(matcher.group(2))
else:
test_class, test_name = message.rsplit(".", 1)
return test_class, test_name, run_time
def handle_test_ended_tag(self, message, test_status):
test_class, test_name, run_time = self.parse_test_description(
message)
test_result = self.state_machine.test()
test_result.run_time = int(run_time)
test_result.code = test_status.value
test_result.current = self.state_machine.running_test_index + 1
if not test_result.is_running():
LOG.error(
"Test has no start tag when trying to end test: %s", message)
return
found_unexpected_test = False
if test_result.test_class != test_class:
LOG.error(
"Expected class: {} but got:{} ".format(test_result.test_class,
test_class))
found_unexpected_test = True
if test_result.test_name != test_name:
LOG.error(
"Expected test: {} but got: {}".format(test_result.test_name,
test_name))
found_unexpected_test = True
if found_unexpected_test or ResultCode.FAILED == test_status:
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__failed__(LifeCycle.TestCase, result)
elif ResultCode.SKIPPED == test_status:
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__skipped__(LifeCycle.TestCase, result)
self.state_machine.test().is_completed = True
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__ended__(LifeCycle.TestCase, result)
self.state_machine.running_test_index += 1
def fake_run_marker(self, message):
fake_marker = re.compile(" +").split(message)
self.handle_test_started_tag(fake_marker)
def handle_suites_started_tag(self, message):
self.state_machine.get_suites(reset=True)
matcher = re.match(r'Running (\d+) test[s]? from .*', message)
expected_test_num = int(matcher.group(1)) if matcher else -1
if expected_test_num >= 0:
test_suites = self.state_machine.get_suites()
test_suites.suites_name = self.get_suite_name()
test_suites.test_num = expected_test_num
test_suites.product_info = self.product_info
for listener in self.get_listeners():
suite_report = copy.copy(test_suites)
listener.__started__(LifeCycle.TestSuites, suite_report)
def handle_suite_started_tag(self, message):
self.state_machine.suite(reset=True)
matcher = re.match(r'(\d+) test[s]? from (.*)', message)
expected_test_num = int(matcher.group(1)) if matcher else -1
if expected_test_num >= 0:
test_suite = self.state_machine.suite()
test_suite.suite_name = matcher.group(2)
test_suite.test_num = expected_test_num
for listener in self.get_listeners():
suite_report = copy.copy(test_suite)
listener.__started__(LifeCycle.TestSuite, suite_report)
def handle_suite_ended_tag(self, message):
self.state_machine.running_test_index = 0
suite_result = self.state_machine.suite()
matcher = re.match(r'.*\((\d+) ms total\)', message)
if matcher:
suite_result.run_time = int(matcher.group(1))
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
def handle_suites_ended_tag(self, message):
suites = self.state_machine.get_suites()
matcher = re.match(r'.*\((\d+) ms total\)', message)
if matcher:
suites.run_time = int(matcher.group(1))
suites.is_completed = True
for listener in self.get_listeners():
copy_suites = copy.copy(suites)
listener.__ended__(LifeCycle.TestSuites, test_result=copy_suites,
suites_name=suites.suites_name,
product_info=suites.product_info,
suite_report=True)
def append_test_output(self, message):
if self.state_machine.test().stacktrace:
self.state_machine.test().stacktrace += "\r\n"
self.state_machine.test().stacktrace += message
@staticmethod
def handle_test_run_failed(error_msg):
if not error_msg:
error_msg = "Unknown error"
if not check_pub_key_exist():
LOG.debug("error_msg:%s" % error_msg)
def mark_test_as_blocked(self, test):
if not self.state_machine.current_suite and not test.class_name:
return
suites_result = self.state_machine.get_suites(reset=True)
suites_result.suites_name = self.get_suite_name()
suite_name = self.state_machine.current_suite.suite_name if \
self.state_machine.current_suite else None
suite_result = self.state_machine.suite(reset=True)
test_result = self.state_machine.test(reset=True)
suite_result.suite_name = suite_name or test.class_name
suite_result.suite_num = 1
test_result.test_class = test.class_name
test_result.test_name = test.test_name
test_result.stacktrace = "error_msg: run crashed"
test_result.num_tests = 1
test_result.run_time = 0
test_result.code = ResultCode.BLOCKED.value
for listener in self.get_listeners():
suite_report = copy.copy(suites_result)
listener.__started__(LifeCycle.TestSuites, suite_report)
for listener in self.get_listeners():
suite_report = copy.copy(suite_result)
listener.__started__(LifeCycle.TestSuite, suite_report)
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__started__(LifeCycle.TestCase, test_result)
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__ended__(LifeCycle.TestCase, test_result)
for listener in self.get_listeners():
suite_report = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite_report,
is_clear=True)
self.__done__()
@Plugin(type=Plugin.PARSER, id=CommonParserType.cpptest_list)
class CppTestListParser(IParser):
def __init__(self):
self.last_test_class_name = None
self.tests = []
def __process__(self, lines):
for line in lines:
if not check_pub_key_exist():
LOG.debug(line)
self.parse(line)
def __done__(self):
pass
def parse(self, line):
class_matcher = re.match('^([a-zA-Z]+.*)\\.$', line)
method_matcher = re.match('\\s+([a-zA-Z_]+[\\S]*)(.*)?(\\s+.*)?$',
line)
if class_matcher:
self.last_test_class_name = class_matcher.group(1)
elif method_matcher:
if not self.last_test_class_name:
LOG.error("parsed new test case name %s but no test class name"
" has been set" % line)
else:
test = TestDescription(self.last_test_class_name,
method_matcher.group(1))
self.tests.append(test)
else:
if not check_pub_key_exist():
LOG.debug("line ignored: %s" % line)
class StatusCodes(Enum):
FAILURE = -2
START = 1
ERROR = -1
SUCCESS = 0
IN_PROGRESS = 2
IGNORE = -3
BLOCKED = 3
class Prefixes(Enum):
STATUS = "INSTRUMENTATION_STATUS: "
STATUS_CODE = "INSTRUMENTATION_STATUS_CODE: "
STATUS_FAILED = "INSTRUMENTATION_FAILED: "
CODE = "INSTRUMENTATION_CODE: "
RESULT = "INSTRUMENTATION_RESULT: "
TIME_REPORT = "Time: "
@Plugin(type=Plugin.PARSER, id=CommonParserType.junit)
class JunitParser(IParser):
def __init__(self):
self.state_machine = StateRecorder()
self.suite_name = ""
self.listeners = []
self.current_key = None
self.current_value = None
self.start_time = datetime.datetime.now()
self.test_time = 0
self.test_run_finished = False
def get_suite_name(self):
return self.suite_name
def get_listeners(self):
return self.listeners
def __process__(self, lines):
for line in lines:
if not check_pub_key_exist():
LOG.debug(line)
self.parse(line)
def __done__(self):
suite_result = self.state_machine.suite()
suite_result.run_time = self.test_time
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite,
suite_report=True)
self.state_machine.current_suite = None
def parse(self, line):
if line.startswith(Prefixes.STATUS_CODE.value):
self.submit_current_key_value()
self.parse_status_code(line)
elif line.startswith(Prefixes.STATUS.value):
self.submit_current_key_value()
self.parse_key(line, len(Prefixes.STATUS.value))
elif line.startswith(Prefixes.RESULT.value):
self.test_run_finished = True
elif line.startswith(Prefixes.STATUS_FAILED.value) or \
line.startswith(Prefixes.CODE.value):
self.submit_current_key_value()
self.test_run_finished = True
elif line.startswith(Prefixes.TIME_REPORT.value):
self.parse_time(line)
else:
if self.current_value:
self.current_value = self.current_value + r"\r\n"
self.current_value = self.current_value + line
elif line:
pass
def parse_key(self, line, key_start_pos):
key_value = line[key_start_pos:].split("=", 1)
if len(key_value) == 2:
self.current_key = key_value[0]
self.current_value = key_value[1]
def parse_time(self, line):
message = line[len(Prefixes.TIME_REPORT.value):]
self.test_time = float(message.replace(",", "")) * 1000
@staticmethod
def check_legality(name):
if not name or name == "null":
return False
return True
def parse_status_code(self, line):
value = line[len(Prefixes.STATUS_CODE.value):]
test_info = self.state_machine.test()
test_info.code = int(value)
if test_info.code != StatusCodes.IN_PROGRESS:
if self.check_legality(test_info.test_class) and \
self.check_legality(test_info.test_name):
self.report_result(test_info)
self.clear_current_test_info()
def clear_current_test_info(self):
self.state_machine.current_test = None
def submit_current_key_value(self):
if self.current_key and self.current_value:
status_value = self.current_value
test_info = self.state_machine.test()
if self.current_key == "class":
test_info.test_class = status_value
elif self.current_key == "test":
test_info.test_name = status_value
elif self.current_key == "numtests":
test_info.num_tests = int(status_value)
elif self.current_key == "Error":
self.handle_test_run_failed(status_value)
elif self.current_key == "stack":
test_info.stacktrace = status_value
elif self.current_key == "stream":
pass
self.current_key = None
self.current_value = None
def report_result(self, test_info):
if not test_info.test_name or not test_info.test_class:
LOG.info("invalid instrumentation status bundle")
return
test_info.is_completed = True
self.report_test_run_started(test_info)
if test_info.code == StatusCodes.START.value:
self.start_time = datetime.datetime.now()
for listener in self.get_listeners():
result = copy.copy(test_info)
listener.__started__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.FAILURE.value:
self.state_machine.running_test_index += 1
test_info.current = self.state_machine.running_test_index
end_time = datetime.datetime.now()
run_time = (end_time - self.start_time).total_seconds()
test_info.run_time = int(run_time * 1000)
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = ResultCode.FAILED.value
listener.__ended__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.ERROR.value:
self.state_machine.running_test_index += 1
test_info.current = self.state_machine.running_test_index
end_time = datetime.datetime.now()
run_time = (end_time - self.start_time).total_seconds()
test_info.run_time = int(run_time * 1000)
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = ResultCode.FAILED.value
listener.__ended__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.SUCCESS.value:
self.state_machine.running_test_index += 1
test_info.current = self.state_machine.running_test_index
end_time = datetime.datetime.now()
run_time = (end_time - self.start_time).total_seconds()
test_info.run_time = int(run_time * 1000)
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = ResultCode.PASSED.value
listener.__ended__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.IGNORE.value:
end_time = datetime.datetime.now()
run_time = (end_time - self.start_time).total_seconds()
test_info.run_time = int(run_time * 1000)
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = ResultCode.SKIPPED.value
listener.__skipped__(LifeCycle.TestCase, result)
elif test_info.code == StatusCodes.BLOCKED.value:
test_info.current = self.state_machine.running_test_index
end_time = datetime.datetime.now()
run_time = (end_time - self.start_time).total_seconds()
test_info.run_time = int(run_time * 1000)
for listener in self.get_listeners():
result = copy.copy(test_info)
result.code = ResultCode.BLOCKED.value
listener.__ended__(LifeCycle.TestCase, result)
self.output_stack_trace(test_info)
@classmethod
def output_stack_trace(cls, test_info):
if check_pub_key_exist():
return
if test_info.stacktrace:
stack_lines = test_info.stacktrace.split(r"\r\n")
LOG.error("stacktrace information is:")
for line in stack_lines:
line.strip()
if line:
LOG.error(line)
def report_test_run_started(self, test_result):
test_suite = self.state_machine.suite()
if not self.state_machine.suite().is_started:
if not test_suite.test_num or not test_suite.suite_name:
test_suite.suite_name = self.get_suite_name()
test_suite.test_num = test_result.num_tests
for listener in self.get_listeners():
suite_report = copy.copy(test_suite)
listener.__started__(LifeCycle.TestSuite, suite_report)
@staticmethod
def handle_test_run_failed(error_msg):
if not error_msg:
error_msg = "Unknown error"
if not check_pub_key_exist():
LOG.debug("error_msg:%s" % error_msg)
def mark_test_as_failed(self, test):
test_info = self.state_machine.test()
if test_info:
test_info.test_class = test.class_name
test_info.test_name = test.test_name
test_info.code = StatusCodes.START.value
self.report_result(test_info)
test_info.code = StatusCodes.FAILURE.value
self.report_result(test_info)
self.__done__()
def mark_test_as_blocked(self, test):
test_info = self.state_machine.test()
if test_info:
test_info.test_class = test.class_name
test_info.test_name = test.test_name
test_info.num_tests = 1
test_info.run_time = 0
test_info.code = StatusCodes.START.value
self.report_result(test_info)
test_info.code = StatusCodes.BLOCKED.value
self.report_result(test_info)
self.__done__()
@Plugin(type=Plugin.PARSER, id=CommonParserType.jsunit)
class JSUnitParser(IParser):
last_line = ""
pattern = r"(\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\.\d{3}) "
def __init__(self):
self.state_machine = StateRecorder()
self.suites_name = ""
self.listeners = []
def get_listeners(self):
return self.listeners
def __process__(self, lines):
if not self.state_machine.suites_is_started():
self.state_machine.trace_logs.extend(lines)
for line in lines:
self.parse(line)
def __done__(self):
suite_result = self.state_machine.suite()
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite,
suite_report=True)
self.state_machine.current_suite = None
def parse(self, line):
if (self.state_machine.suites_is_started() or line.find(
_START_JSUNIT_RUN_MARKER) != -1) and line.find(
_ACE_LOG_MARKER) != -1:
if line.find(_START_JSUNIT_RUN_MARKER) != -1:
self.handle_suites_started_tag()
elif line.endswith(_END_JSUNIT_RUN_MARKER):
self.handle_suites_ended_tag()
elif line.find(_START_JSUNIT_SUITE_RUN_MARKER) != -1:
self.handle_suite_started_tag(line.strip())
elif line.endswith(_START_JSUNIT_SUITE_END_MARKER):
self.handle_suite_ended_tag()
elif _PASS_JSUNIT_MARKER in line or _FAIL_JSUNIT_MARKER \
in line:
self.handle_one_test_tag(line.strip())
self.last_line = line
def parse_test_description(self, message):
pattern = r"\[(pass|fail)\]"
year = time.strftime("%Y")
filter_message = message.split("app Log:")[1].strip()
end_time = "%s-%s" % \
(year, re.match(self.pattern, message).group().strip())
start_time = "%s-%s" % \
(year, re.match(self.pattern,
self.last_line.strip()).group().strip())
start_timestamp = int(time.mktime(
time.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int(
start_time.split(".")[-1])
end_timestamp = int(time.mktime(
time.strptime(end_time, "%Y-%m-%d %H:%M:%S.%f"))) * 1000 + int(
end_time.split(".")[-1])
run_time = end_timestamp - start_timestamp
_, status_end_index = re.match(pattern, filter_message).span()
status = filter_message[:status_end_index]
test_name = filter_message[status_end_index:]
status_dict = {"pass": ResultCode.PASSED, "fail": ResultCode.FAILED,
"ignore": ResultCode.SKIPPED}
status = status_dict.get(status[1:-1])
return test_name, status, run_time
def handle_suites_started_tag(self):
self.state_machine.get_suites(reset=True)
test_suites = self.state_machine.get_suites()
test_suites.suites_name = self.suites_name
test_suites.test_num = 0
for listener in self.get_listeners():
suite_report = copy.copy(test_suites)
listener.__started__(LifeCycle.TestSuites, suite_report)
def handle_suites_ended_tag(self):
suites = self.state_machine.get_suites()
suites.is_completed = True
for listener in self.get_listeners():
listener.__ended__(LifeCycle.TestSuites, test_result=suites,
suites_name=suites.suites_name)
def handle_one_test_tag(self, message):
test_name, status, run_time = \
self.parse_test_description(message)
test_result = self.state_machine.test(reset=True)
test_suite = self.state_machine.suite()
test_result.test_class = test_suite.suite_name
test_result.test_name = test_name
test_result.run_time = run_time
test_result.code = status.value
test_result.current = self.state_machine.running_test_index + 1
self.state_machine.suite().run_time += run_time
for listener in self.get_listeners():
test_result = copy.copy(test_result)
listener.__started__(LifeCycle.TestCase, test_result)
test_suites = self.state_machine.get_suites()
found_unexpected_test = False
if found_unexpected_test or ResultCode.FAILED == status:
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__failed__(LifeCycle.TestCase, result)
elif ResultCode.SKIPPED == status:
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__skipped__(LifeCycle.TestCase, result)
self.state_machine.test().is_completed = True
test_suite.test_num += 1
test_suites.test_num += 1
for listener in self.get_listeners():
result = copy.copy(test_result)
listener.__ended__(LifeCycle.TestCase, result)
self.state_machine.running_test_index += 1
def fake_run_marker(self, message):
fake_marker = re.compile(" +").split(message)
self.processTestStartedTag(fake_marker)
def handle_suite_started_tag(self, message):
self.state_machine.suite(reset=True)
test_suite = self.state_machine.suite()
if re.match(r".*\[suite start\].*", message):
_, index = re.match(r".*\[suite start\]", message).span()
if message[index:]:
test_suite.suite_name = message[index:]
else:
test_suite.suite_name = self.suite_name
test_suite.test_num = 0
for listener in self.get_listeners():
suite_report = copy.copy(test_suite)
listener.__started__(LifeCycle.TestSuite, suite_report)
def handle_suite_ended_tag(self):
suite_result = self.state_machine.suite()
suites = self.state_machine.get_suites()
suite_result.run_time = suite_result.run_time
suites.run_time += suite_result.run_time
suite_result.is_completed = True
for listener in self.get_listeners():
suite = copy.copy(suite_result)
listener.__ended__(LifeCycle.TestSuite, suite, is_clear=True)
def append_test_output(self, message):
if self.state_machine.test().stacktrace:
self.state_machine.test().stacktrace = \
"%s\r\n" % self.state_machine.test().stacktrace
self.state_machine.test().stacktrace = \
''.join((self.state_machine.test().stacktrace, message))

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,387 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import threading
from xdevice import DeviceOsType
from xdevice import ProductForm
from xdevice import ReportException
from xdevice import IDevice
from xdevice import platform_logger
from xdevice import Plugin
from xdevice import exec_cmd
from xdevice import ConfigConst
from xdevice_extension._core.environment.dmlib import HdcHelper
from xdevice_extension._core.exception import HdcError
from xdevice_extension._core.environment.dmlib import CollectingOutputReceiver
from xdevice_extension._core.environment.device_state import \
DeviceAllocationState
from xdevice_extension._core.utils import check_path_legal
from xdevice_extension._core.utils import convert_serial
from xdevice_extension._core.constants import DeviceConnectorType
__all__ = ["Device"]
TIMEOUT = 300 * 1000
RETRY_ATTEMPTS = 2
DEFAULT_UNAVAILABLE_TIMEOUT = 20 * 1000
BACKGROUND_TIME = 2 * 60 * 1000
LOG = platform_logger("Device")
def perform_device_action(func):
def device_action(self, *args, **kwargs):
if not self.get_recover_state():
LOG.debug("device %s %s is false" % (self.device_sn,
ConfigConst.recover_state))
return
# avoid infinite recursion, such as device reboot
abort_on_exception = bool(kwargs.get("abort_on_exception", False))
if abort_on_exception:
result = func(self, *args, **kwargs)
return result
tmp = int(kwargs.get("retry", RETRY_ATTEMPTS))
retry = tmp + 1 if tmp > 0 else 1
exception = None
for _ in range(retry):
try:
result = func(self, *args, **kwargs)
return result
except ReportException as error:
self.log.exception("Generate report error!", exc_info=False)
exception = error
except (ConnectionResetError, ConnectionRefusedError) as error:
self.log.error("error type: %s, error: %s" %
(error.__class__.__name__, error))
if self.usb_type == DeviceConnectorType.hdc:
cmd = "hdc reset"
self.log.info("re-execute hdc reset")
exec_cmd(cmd)
if not self.recover_device():
LOG.debug("set device %s %s false" % (
self.device_sn, ConfigConst.recover_state))
self.set_recover_state(False)
raise error
exception = error
except HdcError as error:
self.log.error("error type: %s, error: %s" %
(error.__class__.__name__, error))
if not self.recover_device():
LOG.debug("set device %s %s false" % (
self.device_sn, ConfigConst.recover_state))
self.set_recover_state(False)
raise error
exception = error
except Exception as error:
self.log.exception("error type: %s, error: %s" % (
error.__class__.__name__, error), exc_info=False)
exception = error
raise exception
return device_action
@Plugin(type=Plugin.DEVICE, id=DeviceOsType.default)
class Device(IDevice):
"""
Class representing a device.
Each object of this class represents one device in xDevice,
including handles to hdc, fastboot, and test agent.
Attributes:
device_sn: A string that's the serial number of the device.
"""
device_sn = None
host = None
port = None
usb_type = None
is_timeout = False
device_hilog_proc = None
device_os_type = DeviceOsType.default
test_device_state = None
device_allocation_state = DeviceAllocationState.available
label = None
log = platform_logger("Device")
device_state_monitor = None
reboot_timeout = 2 * 60 * 1000
hilog_file_pipe = None
model_dict = {
'default': ProductForm.phone,
'car': ProductForm.car,
'tv': ProductForm.television,
'watch': ProductForm.watch,
'tablet': ProductForm.tablet
}
def __init__(self):
self.extend_value = {}
self.device_lock = threading.RLock()
def __eq__(self, other):
return self.device_sn == other.__get_serial__() and \
self.device_os_type == other.device_os_type
def __set_serial__(self, device_sn=""):
self.device_sn = device_sn
return self.device_sn
def __get_serial__(self):
return self.device_sn
def get(self, key=None, default=None):
if not key:
return default
value = getattr(self, key, None)
if value:
return value
else:
return self.extend_value.get(key, default)
def recover_device(self):
if not self.get_recover_state():
LOG.debug("device %s %s is false, cannot recover device" % (
self.device_sn, ConfigConst.recover_state))
return
LOG.debug("wait device %s to recover" % self.device_sn)
return self.device_state_monitor.wait_for_device_available()
def get_device_type(self):
model = self.get_property("ro.build.characteristics",
abort_on_exception=True)
self.label = self.model_dict.get(model, None)
def get_property(self, prop_name, retry=RETRY_ATTEMPTS,
abort_on_exception=False):
"""
Hdc command, ddmlib function.
"""
command = "getprop %s" % prop_name
stdout = self.execute_shell_command(
command, timeout=5 * 1000, output_flag=False, retry=retry,
abort_on_exception=abort_on_exception).strip()
if stdout:
LOG.debug(stdout)
return stdout
@perform_device_action
def hdc_command(self, command, **kwargs):
timeout = int(kwargs.get("timeout", TIMEOUT)) / 1000
error_print = bool(kwargs.get("error_print", True))
join_result = bool(kwargs.get("join_result", False))
timeout_msg = '' if timeout == 300.0 else \
" with timeout %ss" % timeout
if self.usb_type == DeviceConnectorType.hdc:
LOG.debug("%s execute command hdc %s%s" % (
convert_serial(self.device_sn), command, timeout_msg))
cmd = ["hdc_std", "-t", self.device_sn]
if isinstance(command, list):
cmd.extend(command)
else:
command = command.strip()
cmd.extend(command.split(" "))
result = exec_cmd(cmd, timeout, error_print, join_result)
if not result:
return result
for line in str(result).split("\n"):
if line.strip():
LOG.debug(line.strip())
return result
@perform_device_action
def execute_shell_command(self, command, timeout=TIMEOUT,
receiver=None, **kwargs):
if not receiver:
collect_receiver = CollectingOutputReceiver()
HdcHelper.execute_shell_command(
self, command, timeout=timeout,
receiver=collect_receiver, **kwargs)
return collect_receiver.output
else:
return HdcHelper.execute_shell_command(
self, command, timeout=timeout,
receiver=receiver, **kwargs)
def execute_shell_cmd_background(self, command, timeout=TIMEOUT,
receiver=None):
status = HdcHelper.execute_shell_command(self, command,
timeout=timeout,
receiver=receiver)
self.wait_for_device_not_available(DEFAULT_UNAVAILABLE_TIMEOUT)
self.device_state_monitor.wait_for_device_available(BACKGROUND_TIME)
cmd = "target mount" \
if self.usb_type == DeviceConnectorType.hdc else "remount"
self.hdc_command(cmd)
self.start_catch_device_log()
return status
def wait_for_device_not_available(self, wait_time):
return self.device_state_monitor.wait_for_device_not_available(
wait_time)
def _wait_for_device_online(self, wait_time=None):
return self.device_state_monitor.wait_for_device_online(wait_time)
def _do_reboot(self):
HdcHelper.reboot(self)
if not self.wait_for_device_not_available(
DEFAULT_UNAVAILABLE_TIMEOUT):
LOG.error("Did not detect device {} becoming unavailable "
"after reboot".format(convert_serial(self.device_sn)))
def _reboot_until_online(self):
self._do_reboot()
self._wait_for_device_online()
def reboot(self):
self._reboot_until_online()
self.device_state_monitor.wait_for_device_available(
self.reboot_timeout)
self.enable_hdc_root()
self.start_catch_device_log()
@perform_device_action
def install_package(self, package_path, command=""):
if package_path is None:
raise HdcError(
"install package: package path cannot be None!")
return HdcHelper.install_package(self, package_path, command)
@perform_device_action
def uninstall_package(self, package_name):
return HdcHelper.uninstall_package(self, package_name)
@perform_device_action
def push_file(self, local, remote, **kwargs):
"""
Push a single file.
The top directory won't be created if is_create is False (by default)
and vice versa
"""
if local is None:
raise HdcError("XDevice Local path cannot be None!")
remote_is_dir = kwargs.get("remote_is_dir", False)
if remote_is_dir:
ret = self.execute_shell_command("test -d %s && echo 0" % remote)
if not (ret != "" and len(str(ret).split()) != 0 and
str(ret).split()[0] == "0"):
self.execute_shell_command("mkdir -p %s" % remote)
is_create = kwargs.get("is_create", False)
timeout = kwargs.get("timeout", TIMEOUT)
HdcHelper.push_file(self, local, remote, is_create=is_create,
timeout=timeout)
if not self.is_file_exist(remote):
LOG.error("push %s to %s failed" % (local, remote))
raise HdcError("push %s to %s failed" % (local, remote))
@perform_device_action
def pull_file(self, remote, local, **kwargs):
"""
Pull a single file.
The top directory won't be created if is_create is False (by default)
and vice versa
"""
is_create = kwargs.get("is_create", False)
timeout = kwargs.get("timeout", TIMEOUT)
HdcHelper.pull_file(self, remote, local, is_create=is_create,
timeout=timeout)
def is_hdc_root(self):
output = self.execute_shell_command("id")
return "uid=0(root)" in output
def enable_hdc_root(self):
if self.is_hdc_root():
return True
for index in range(3):
cmd = "smode" \
if self.usb_type == DeviceConnectorType.hdc else "root"
output = self.hdc_command(cmd)
if self.is_hdc_root():
return True
LOG.debug(
"hdc root on %s unsuccessful on attempt %d. "
"Output: %s" % (
convert_serial(self.device_sn), index, output))
return False
def is_directory(self, path):
path = check_path_legal(path)
output = self.execute_shell_command("ls -ld {}".format(path))
if output and output.startswith('d'):
return True
return False
def is_file_exist(self, file_path):
file_path = check_path_legal(file_path)
command = ["hdc_std", "shell", "ls", file_path]
output = exec_cmd(command)
if output and "No such file or directory" not in output:
return True
return False
def start_catch_device_log(self, hilog_file_pipe=None):
"""
Starts hdc log for each device in separate subprocesses and save
the logs in files.
"""
if hilog_file_pipe:
self.hilog_file_pipe = hilog_file_pipe
self._start_catch_device_log()
def stop_catch_device_log(self):
"""
Stops all hdc log subprocesses.
"""
self._stop_catch_device_log()
def _start_catch_device_log(self):
pass
def _stop_catch_device_log(self):
pass
def get_recover_result(self, retry=RETRY_ATTEMPTS):
command = "getprop ro.product.device"
stdout = self.execute_shell_command(command, timeout=5 * 1000,
output_flag=False, retry=retry,
abort_on_exception=True).strip()
if stdout:
LOG.debug(stdout)
return stdout
def set_recover_state(self, state):
with self.device_lock:
setattr(self, ConfigConst.recover_state, state)
def get_recover_state(self, default_state=True):
with self.device_lock:
state = getattr(self, ConfigConst.recover_state, default_state)
return state
def close(self):
pass

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from threading import Condition
from xdevice_extension._core.environment.device_state import TestDeviceState
from xdevice_extension._core.utils import convert_serial
CHECK_POLL_TIME = 3
MAX_CHECK_POLL_TIME = 30
class DeviceStateListener(object):
def __init__(self, expected_state):
self.expected_state = expected_state
self.condition = Condition()
def state_changed(self, new_state):
if self.expected_state == new_state:
with self.condition:
self.condition.notify_all()
def get_expected_state(self):
return self.expected_state
class DeviceStateMonitor(object):
"""
Provides facilities for monitoring the state of a Device.
"""
def __init__(self, device):
self.state_listener = []
self.device_state = device.test_device_state
self.device = device
self.default_online_timeout = 1 * 60 * 1000
self.default_available_timeout = 6 * 60 * 1000
def wait_for_device_state(self, state, wait_time):
listener = DeviceStateListener(state)
if self.device_state == state:
return True
self.device.log.debug(
"wait device %s for %s" % (convert_serial(self.device.device_sn),
state))
self.add_device_state_listener(listener)
with listener.condition:
try:
listener.condition.wait(wait_time / 1000)
finally:
self.remove_device_state_listener(listener)
return self.device_state == state
def wait_for_device_not_available(self, wait_time):
return self.wait_for_device_state(TestDeviceState.NOT_AVAILABLE,
wait_time)
def wait_for_device_online(self, wait_time=None):
if not wait_time:
wait_time = self.default_online_timeout
return self.wait_for_device_state(TestDeviceState.ONLINE, wait_time)
def wait_for_boot_complete(self, wait_time):
counter = 1
start_time = int(time.time()*1000)
self.device.log.debug("wait for boot complete, and wait time: %s ms" %
wait_time)
while int(time.time()*1000) - start_time < wait_time:
try:
result = self.device.get_recover_result(retry=0)
if result == "1":
return True
except Exception as exception:
self.device.log.error("wait for boot complete exception: %s"
% exception)
time.sleep(min(CHECK_POLL_TIME * counter, MAX_CHECK_POLL_TIME))
counter = counter + 1
return False
def wait_for_device_available(self, wait_time=None):
if not wait_time:
wait_time = self.default_available_timeout
start_time = int(time.time()*1000)
if not self.wait_for_device_online(wait_time):
return False
elapsed_time = int(time.time()*1000) - start_time
if not self.wait_for_boot_complete(wait_time - elapsed_time):
return False
return True
def remove_device_state_listener(self, listener):
self.state_listener.remove(listener)
def add_device_state_listener(self, listener):
self.state_listener.append(listener)
def set_state(self, new_state):
if not new_state:
return
self.device_state = new_state
for listener in self.state_listener:
listener.state_changed(new_state)

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
from enum import Enum
from enum import unique
@unique
class TestDeviceState(Enum):
FASTBOOT = "FASTBOOT"
ONLINE = "ONLINE"
RECOVERY = "RECOVERY"
NOT_AVAILABLE = "NOT_AVAILABLE"
@staticmethod
def get_test_device_state(device_state):
if device_state is None:
return TestDeviceState.NOT_AVAILABLE
elif device_state == DeviceState.ONLINE:
return TestDeviceState.ONLINE
elif device_state == DeviceState.OFFLINE:
return TestDeviceState.NOT_AVAILABLE
elif device_state == DeviceState.RECOVERY:
return TestDeviceState.RECOVERY
elif device_state == DeviceState.BOOTLOADER:
return TestDeviceState.FASTBOOT
else:
return TestDeviceState.NOT_AVAILABLE
@unique
class DeviceState(Enum):
BOOTLOADER = "bootloader"
OFFLINE = "offline"
ONLINE = "device"
RECOVERY = "recovery"
@staticmethod
def get_state(state):
for device_state in DeviceState:
if device_state.value == state:
return device_state
return None
@unique
class DeviceEvent(Enum):
"""
Represents a test device event that can change allocation state
"""
CONNECTED_ONLINE = "CONNECTED_ONLINE"
CONNECTED_OFFLINE = "CONNECTED_OFFLINE"
STATE_CHANGE_ONLINE = "STATE_CHANGE_ONLINE"
STATE_CHANGE_OFFLINE = "STATE_CHANGE_OFFLINE"
DISCONNECTED = "DISCONNECTED"
FORCE_AVAILABLE = "FORCE_AVAILABLE"
AVAILABLE_CHECK_PASSED = "AVAILABLE_CHECK_PASSED"
AVAILABLE_CHECK_FAILED = "AVAILABLE_CHECK_FAILED"
AVAILABLE_CHECK_IGNORED = "AVAILABLE_CHECK_IGNORED"
ALLOCATE_REQUEST = "ALLOCATE_REQUEST"
FORCE_ALLOCATE_REQUEST = "FORCE_ALLOCATE_REQUEST"
FREE_AVAILABLE = "FREE_AVAILABLE"
FREE_UNRESPONSIVE = "FREE_UNRESPONSIVE"
FREE_UNAVAILABLE = "FREE_UNAVAILABLE"
FREE_UNKNOWN = "FREE_UNKNOWN"
def handle_allocation_event(old_state, event):
new_state = None
if event == DeviceEvent.CONNECTED_ONLINE \
or event == DeviceEvent.STATE_CHANGE_ONLINE:
if old_state == DeviceAllocationState.allocated:
new_state = old_state
else:
new_state = DeviceAllocationState.checking_availability
elif event == DeviceEvent.CONNECTED_OFFLINE \
or event == DeviceEvent.STATE_CHANGE_OFFLINE \
or event == DeviceEvent.DISCONNECTED:
if old_state == DeviceAllocationState.allocated:
new_state = old_state
else:
new_state = DeviceAllocationState.unknown
elif event == DeviceEvent.ALLOCATE_REQUEST:
new_state = DeviceAllocationState.allocated
elif event == DeviceEvent.FREE_AVAILABLE:
new_state = DeviceAllocationState.available
elif event == DeviceEvent.FREE_UNAVAILABLE:
new_state = DeviceAllocationState.unknown
elif event == DeviceEvent.AVAILABLE_CHECK_IGNORED:
new_state = DeviceAllocationState.ignored
elif event == DeviceEvent.AVAILABLE_CHECK_PASSED:
new_state = DeviceAllocationState.available
return new_state
@dataclass
class DeviceAllocationState:
ignored = "Ignored"
available = "Available"
allocated = "Allocated"
checking_availability = "Checking_Availability"
unavailable = "Unavailable"
unknown = "unknown"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,59 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from xdevice import IDevice
from xdevice import platform_logger
from xdevice_extension._core.environment.device_state import \
DeviceAllocationState
from xdevice_extension._core.environment.device_state import TestDeviceState
LOG = platform_logger("Emulator")
class Emulator(IDevice):
"""
Class representing an emulator.
Each object of this class represents one emulator in xDevice.
Attributes:
device_sn: A string that's the serial number of the emulator.
"""
def __get_serial__(self):
pass
def __set_serial__(self, device_sn=""):
pass
def __init__(self, device_sn=""):
self.device_sn = device_sn
self.is_timeout = False
self.device_log_proc = None
self.test_device_state = TestDeviceState.ONLINE
self.device_allocation_state = DeviceAllocationState.available
def __serial__(self):
return self.device_sn
def get_device_sn(self):
"""
Returns the serial number of the device.
"""
return self.device_sn

View File

@ -0,0 +1,370 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import threading
from xdevice import UserConfigManager
from xdevice import ManagerType
from xdevice import Plugin
from xdevice import get_plugin
from xdevice import IDeviceManager
from xdevice import platform_logger
from xdevice import ParamError
from xdevice import ConfigConst
from xdevice_extension._core.environment.device_monitor import \
DeviceStateMonitor
from xdevice_extension._core.environment.device_state import DeviceEvent
from xdevice_extension._core.environment.device_state import TestDeviceState
from xdevice_extension._core.environment.device_state import DeviceState
from xdevice_extension._core.environment.device_state import \
handle_allocation_event
from xdevice_extension._core.environment.device_state import \
DeviceAllocationState
from xdevice_extension._core.environment.dmlib import DeviceConnector
from xdevice_extension._core.utils import convert_serial
from xdevice_extension._core.utils import convert_ip
from xdevice_extension._core.utils import convert_port
from xdevice_extension._core.constants import DeviceConnectorType
from xdevice_extension._core.exception import HdcCommandRejectedException
__all__ = ["ManagerDevice"]
LOG = platform_logger("ManagerDevice")
@Plugin(type=Plugin.MANAGER, id=ManagerType.device)
class ManagerDevice(IDeviceManager):
"""
Class representing device manager
managing the set of available devices for testing
"""
def __init__(self):
self.devices_list = []
self.global_device_filter = None
self.lock_con = threading.Condition()
self.list_con = threading.Condition()
self.device_connector = None
self.managed_device_listener = None
self.support_labels = ["phone", "watch", "car", "tv", "tablet", "ivi"]
self.support_types = ["device"]
def init_environment(self, environment="", user_config_file=""):
self._start_device_monitor(environment, user_config_file)
def env_stop(self):
self._stop_device_monitor()
def _start_device_monitor(self, environment="", user_config_file=""):
self.managed_device_listener = ManagedDeviceListener(self)
device = UserConfigManager(
config_file=user_config_file, env=environment).get_device(
"environment/device")
if device:
try:
self.device_connector = DeviceConnector(device.get("ip"),
device.get("port"),
device.get("usb_type"))
self.global_device_filter = UserConfigManager(
config_file=user_config_file, env=environment).get_sn_list(
device.get("sn"))
self.device_connector.add_device_change_listener(
self.managed_device_listener)
self.device_connector.start()
except (ParamError, FileNotFoundError) as error:
self.env_stop()
LOG.debug("start %s error: %s" % (
device.get("usb_type"), error))
if device.get("usb_type") == DeviceConnectorType.hdc:
self.device_connector = DeviceConnector(
device.get("ip"), device.get("port"),
"usb-hdc")
self.device_connector.add_device_change_listener(
self.managed_device_listener)
self.device_connector.start()
else:
raise ParamError("Manager device is not supported, please "
"check config user_config.xml", error_no="00108")
def _stop_device_monitor(self):
self.device_connector.remove_device_change_listener(
self.managed_device_listener)
self.device_connector.terminate()
def find(self, idevice):
LOG.debug("find: apply list con lock")
self.list_con.acquire()
try:
for device in self.devices_list:
if device.device_sn == idevice.device_sn and \
device.device_os_type == idevice.device_os_type:
return device
finally:
LOG.debug("find: release list con lock")
self.list_con.release()
def apply_device(self, device_option, timeout=10):
LOG.debug("apply_device: apply lock con lock")
self.lock_con.acquire()
try:
device = self.allocate_device_option(device_option)
if device:
return device
LOG.debug("wait for available device founded")
self.lock_con.wait(timeout)
return self.allocate_device_option(device_option)
finally:
LOG.debug("apply_device: release lock con lock")
self.lock_con.release()
def allocate_device_option(self, device_option):
"""
Request a device for testing that meets certain criteria.
"""
LOG.debug("allocate_device_option: apply list con lock")
if not self.list_con.acquire(timeout=5):
LOG.debug("allocate_device_option: list con wait timeout")
return None
try:
allocated_device = None
for device in self.devices_list:
if device_option.matches(device):
self.handle_device_event(device,
DeviceEvent.ALLOCATE_REQUEST)
LOG.debug("allocate device sn: %s, type: %s" % (
device.__get_serial__(), device.__class__))
return device
return allocated_device
finally:
LOG.debug("allocate_device_option: release list con lock")
self.list_con.release()
def release_device(self, device):
LOG.debug("release_device: apply list con lock")
self.list_con.acquire()
try:
if device.test_device_state == TestDeviceState.ONLINE:
self.handle_device_event(device, DeviceEvent.FREE_AVAILABLE)
else:
self.handle_device_event(device, DeviceEvent.FREE_UNAVAILABLE)
device.device_id = None
LOG.debug("free device sn: %s, type: %s" % (
device.__get_serial__(), device.__class__.__name__))
finally:
LOG.debug("release_device: release list con lock")
self.list_con.release()
def find_device(self, device_sn, device_os_type):
for device in self.devices_list:
if device.device_sn == device_sn and \
device.device_os_type == device_os_type:
return device
def append_device_by_sort(self, device_instance):
if (not self.global_device_filter or
not self.devices_list or
device_instance.device_sn not in self.global_device_filter):
self.devices_list.append(device_instance)
else:
device_dict = dict(zip(
self.global_device_filter,
list(range(1, len(self.global_device_filter)+1))))
for index in range(len(self.devices_list)):
if self.devices_list[index].device_sn not in \
self.global_device_filter:
self.devices_list.insert(index, device_instance)
break
if device_dict[device_instance.device_sn] < \
device_dict[self.devices_list[index].device_sn]:
self.devices_list.insert(index, device_instance)
break
else:
self.devices_list.append(device_instance)
def find_or_create(self, idevice):
LOG.debug("find_or_create: apply list con lock")
self.list_con.acquire()
try:
device = self.find_device(idevice.device_sn,
idevice.device_os_type)
if device is None:
device = get_plugin(
plugin_type=Plugin.DEVICE,
plugin_id=idevice.device_os_type)[0]
device_instance = device.__class__()
device_instance.__set_serial__(idevice.device_sn)
device_instance.host = idevice.host
device_instance.port = idevice.port
device_instance.usb_type = self.device_connector.usb_type
LOG.debug("create device(%s) host is %s, "
"port is %s, device sn is %s, usb type is %s" %
(device_instance, device_instance.host,
device_instance.port, device_instance.device_sn,
device_instance.usb_type))
device_instance.device_state = DeviceState.get_state(
idevice.device_state)
device_instance.test_device_state = \
TestDeviceState.get_test_device_state(
device_instance.device_state)
device_instance.device_state_monitor = \
DeviceStateMonitor(device_instance)
if idevice.device_state == DeviceState.ONLINE:
device_instance.get_device_type()
self.append_device_by_sort(device_instance)
device = device_instance
else:
LOG.debug("find device(%s), host is %s, "
"port is %s, device sn is %s, usb type is %s" %
(device, device.host, device.port, device.device_sn,
device.usb_type))
return device
except HdcCommandRejectedException as hcr_error:
LOG.debug("%s occurs error. Reason:%s" %
(idevice.device_sn, hcr_error))
finally:
LOG.debug("find_or_create: release list con lock")
self.list_con.release()
def remove(self, idevice):
LOG.debug("remove: apply list con lock")
self.list_con.acquire()
try:
self.devices_list.remove(idevice)
finally:
LOG.debug("remove: release list con lock")
self.list_con.release()
def handle_device_event(self, device, event):
state_changed = None
old_state = device.device_allocation_state
new_state = handle_allocation_event(old_state, event)
if new_state == DeviceAllocationState.checking_availability:
if self.global_device_filter and \
device.device_sn not in self.global_device_filter:
event = DeviceEvent.AVAILABLE_CHECK_IGNORED
else:
event = DeviceEvent.AVAILABLE_CHECK_PASSED
new_state = handle_allocation_event(new_state, event)
if old_state != new_state:
state_changed = True
device.device_allocation_state = new_state
if state_changed is True and \
new_state == DeviceAllocationState.available:
# notify_device_state_change
LOG.debug("handle_device_event apply lock_con")
self.lock_con.acquire()
LOG.debug("find available device")
self.lock_con.notify_all()
LOG.debug("handle_device_event release lock_con")
self.lock_con.release()
if device.device_allocation_state == \
DeviceAllocationState.unknown:
self.remove(device)
return
def launch_emulator(self):
pass
def kill_emulator(self):
pass
def list_devices(self):
print("devices:")
print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".format(
"Serial", "OsType", "State", "Allocation", "Product", "host",
"port"))
for device in self.devices_list:
print("{0:<20}{1:<16}{2:<16}{3:<16}{4:<16}{5:<16}{6:<16}".format(
convert_serial(device.device_sn), device.device_os_type,
device.test_device_state.value,
device.device_allocation_state,
device.label if device.label else 'None',
convert_ip(device.host), convert_port(device.port)))
class ManagedDeviceListener(object):
"""
A class to listen for and act on device presence updates from ddmlib
"""
def __init__(self, manager):
self.manager = manager
def device_changed(self, idevice):
test_device = self.manager.find_or_create(idevice)
if test_device is None:
return
new_state = TestDeviceState.get_test_device_state(idevice.device_state)
test_device.test_device_state = new_state
if new_state == TestDeviceState.ONLINE:
self.manager.handle_device_event(test_device,
DeviceEvent.STATE_CHANGE_ONLINE)
elif new_state == TestDeviceState.NOT_AVAILABLE:
self.manager.handle_device_event(test_device,
DeviceEvent.STATE_CHANGE_OFFLINE)
test_device.device_state_monitor.set_state(
test_device.test_device_state)
LOG.debug("device changed to %s: %s %s %s %s" % (
new_state, convert_serial(idevice.device_sn),
idevice.device_os_type, idevice.host, idevice.port))
def device_connected(self, idevice):
test_device = self.manager.find_or_create(idevice)
if test_device is None:
return
new_state = TestDeviceState.get_test_device_state(idevice.device_state)
test_device.test_device_state = new_state
if test_device.test_device_state == TestDeviceState.ONLINE:
self.manager.handle_device_event(test_device,
DeviceEvent.CONNECTED_ONLINE)
elif new_state == TestDeviceState.NOT_AVAILABLE:
self.manager.handle_device_event(test_device,
DeviceEvent.CONNECTED_OFFLINE)
test_device.device_state_monitor.set_state(
test_device.test_device_state)
LOG.debug("device connected: %s %s %s %s" % (
convert_serial(idevice.device_sn), idevice.device_os_type,
idevice.host, idevice.port))
LOG.debug("set device %s %s to true" % (
idevice.device_sn, ConfigConst.recover_state))
test_device.set_recover_state(True)
def device_disconnected(self, disconnected_device):
test_device = self.manager.find(disconnected_device)
if test_device is not None:
test_device.test_device_state = TestDeviceState.NOT_AVAILABLE
self.manager.handle_device_event(test_device,
DeviceEvent.DISCONNECTED)
test_device.device_state_monitor.set_state(
TestDeviceState.NOT_AVAILABLE)
LOG.debug("device disconnected: %s %s %s %s" % (
convert_serial(disconnected_device.device_sn),
disconnected_device.device_os_type,
disconnected_device.host, disconnected_device.port))

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from xdevice import DeviceError
class HdcError(DeviceError):
"""
Raised when there is an error in hdc operations.
"""
def __init__(self, error_msg, error_no=""):
super(HdcError, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
class HdcCommandRejectedException(HdcError):
"""
Exception thrown when hdc refuses a command.
"""
def __init__(self, error_msg, error_no=""):
super(HdcCommandRejectedException, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
class ShellCommandUnresponsiveException(HdcError):
"""
Exception thrown when a shell command executed on a device takes too long
to send its output.
"""
def __init__(self, error_msg="ShellCommandUnresponsiveException",
error_no=""):
super(ShellCommandUnresponsiveException, self).\
__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
class DeviceUnresponsiveException(HdcError):
"""
Exception thrown when a shell command executed on a device takes too long
to send its output.
"""
def __init__(self, error_msg="DeviceUnresponsiveException", error_no=""):
super(DeviceUnresponsiveException, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
class AppInstallError(DeviceError):
def __init__(self, error_msg, error_no=""):
super(AppInstallError, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)
class HapNotSupportTest(DeviceError):
def __init__(self, error_msg, error_no=""):
super(HapNotSupportTest, self).__init__(error_msg, error_no)
self.error_msg = error_msg
self.error_no = error_no
def __str__(self):
return str(self.error_msg)

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,98 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from xdevice import Plugin
from xdevice import LifeCycle
from xdevice import IListener
from xdevice import platform_logger
from xdevice import TestDescription
from xdevice_extension._core.constants import ListenerType
__all__ = ["CollectingTestListener", "CollectingLiteGTestListener"]
LOG = platform_logger("Listener")
@Plugin(type=Plugin.LISTENER, id=ListenerType.collect)
class CollectingTestListener(IListener):
"""
listener test status information to the console
"""
def __init__(self):
self.tests = []
def __started__(self, lifecycle, test_result):
if lifecycle == LifeCycle.TestCase:
if not test_result.test_class or not test_result.test_name:
return
test = TestDescription(test_result.test_class,
test_result.test_name)
if test not in self.tests:
self.tests.append(test)
def __ended__(self, lifecycle, test_result=None, **kwargs):
pass
def __skipped__(self, lifecycle, test_result):
pass
def __failed__(self, lifecycle, test_result):
pass
def get_current_run_results(self):
return self.tests
@Plugin(type=Plugin.LISTENER, id=ListenerType.collect_lite)
class CollectingLiteGTestListener(IListener):
"""
listener test status information to the console
"""
def __init__(self):
self.tests = []
def __started__(self, lifecycle, test_result):
pass
def __ended__(self, lifecycle, test_result=None, **kwargs):
del kwargs
if lifecycle == LifeCycle.TestCase:
if not test_result.test_class or not test_result.test_name:
return
test = TestDescription(test_result.test_class,
test_result.test_name)
if test not in self.tests:
self.tests.append(test)
def __skipped__(self, lifecycle, test_result):
pass
def __failed__(self, lifecycle, test_result):
if lifecycle == LifeCycle.TestCase:
if not test_result.test_class or not test_result.test_name:
return
test = TestDescription(test_result.test_class,
test_result.test_name)
if test not in self.tests:
self.tests.append(test)
def get_current_run_results(self):
return self.tests

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,960 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import platform
import re
import signal
import subprocess
import zipfile
import stat
import time
import json
from dataclasses import dataclass
from tempfile import TemporaryDirectory
from tempfile import NamedTemporaryFile
from xdevice import ITestKit
from xdevice import platform_logger
from xdevice import Plugin
from xdevice import ParamError
from xdevice import get_file_absolute_path
from xdevice import get_config_value
from xdevice import exec_cmd
from xdevice_extension._core.constants import CKit
from xdevice_extension._core.environment.dmlib import CollectingOutputReceiver
from xdevice_extension._core.utils import check_path_legal
from xdevice_extension._core.utils import modify_props
from xdevice_extension._core.exception import AppInstallError
from xdevice_extension._core.constants import DeviceConnectorType
from xdevice_extension._core.utils import convert_serial
__all__ = ["STSKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
"ConfigKit", "AppInstallKit", "junit_para_parse",
"gtest_para_parse", "junit_dex_para_parse", "reset_junit_para"]
LOG = platform_logger("Kit")
@Plugin(type=Plugin.TEST_KIT, id=CKit.sts)
class STSKit(ITestKit):
def __init__(self):
self.sts_version = ""
self.throw_error = ""
def __check_config__(self, config):
self.sts_version = get_config_value('sts-version', config)
self.throw_error = get_config_value('throw-error', config)
if len(self.sts_version) < 1:
raise TypeError(
"The sts_version: {} is invalid".format(self.sts_version))
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("STSKit setup, device:{}, params:{}".
format(device, self.get_plugin_config().__dict__))
device_spl = device.get_property(Props.security_patch)
if device_spl is None or device_spl == "":
LOG.error("The device security {} is invalid".format(device_spl))
raise ParamError(
"The device security patch version {} is invalid".format(
device_spl))
rex = '^[a-zA-Z\\d\\.]+_([\\d]+-[\\d]+)$'
match = re.match(rex, self.sts_version)
if match is None:
LOG.error("The sts version {} does match the rule".format(
self.sts_version))
raise ParamError("The sts version {} does match the rule".format(
self.sts_version))
sts_version_date_user = match.group(1).join("-01")
sts_version_date_kernel = match.group(1).join("-05")
if device_spl in [sts_version_date_user, sts_version_date_kernel]:
LOG.info(
"The device SPL version {} match the sts version {}".format(
device_spl, self.sts_version))
else:
err_msg = "The device SPL version {} does not match the sts " \
"version {}".format(device_spl, self.sts_version)
LOG.error(err_msg)
raise ParamError(err_msg)
def __teardown__(self, device):
LOG.debug("STSKit teardown: device:{}, params:{}".format
(device, self.get_plugin_config().__dict__))
@Plugin(type=Plugin.TEST_KIT, id=CKit.push)
class PushKit(ITestKit):
def __init__(self):
self.pre_push = ""
self.push_list = ""
self.post_push = ""
self.is_uninstall = ""
self.paths = ""
self.pushed_file = []
self.abort_on_push_failure = True
def __check_config__(self, config):
self.pre_push = get_config_value('pre-push', config)
self.push_list = get_config_value('push', config)
self.post_push = get_config_value('post-push', config)
self.is_uninstall = get_config_value('uninstall', config,
is_list=False, default=True)
self.abort_on_push_failure = get_config_value(
'abort-on-push-failure', config, is_list=False, default=True)
if isinstance(self.abort_on_push_failure, str):
self.abort_on_push_failure = False if \
self.abort_on_push_failure.lower() == "false" else True
self.paths = get_config_value('paths', config)
self.pushed_file = []
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("PushKit setup, device:{}".format(device.device_sn))
for command in self.pre_push:
run_command(device, command)
for push_info in self.push_list:
files = re.split('->|=>', push_info)
if len(files) != 2:
LOG.error("The push spec is invalid: {}".format(push_info))
continue
src = files[0].strip()
dst = files[1].strip() if files[1].strip().startswith("/") else \
files[1].strip() + Props.dest_root
LOG.debug(
"Trying to push the file local {} to remote {}".format(src,
dst))
try:
real_src_path = get_file_absolute_path(src, self.paths)
except ParamError as error:
if self.abort_on_push_failure:
raise
else:
LOG.warning(error, error_no=error.error_no)
continue
remount(device)
device.push_file(real_src_path, dst)
if os.path.isdir(real_src_path):
self.add_pushed_dir(real_src_path, dst)
else:
if device.is_directory(dst):
self.pushed_file.append(
os.path.join(dst, os.path.basename(real_src_path)))
else:
self.pushed_file.append(dst)
LOG.debug("Push file finished from {} to {}".format(src, dst))
for command in self.post_push:
run_command(device, command)
def add_pushed_dir(self, src, dst):
for root, _, files in os.walk(src):
for file_path in files:
self.pushed_file.append(
os.path.join(root, file_path).replace(src, dst))
def __teardown__(self, device):
LOG.debug("PushKit teardown: device:{}".format(device.device_sn))
if self.is_uninstall:
remount(device)
for file_name in self.pushed_file:
LOG.debug("Trying to remove file {}".format(file_name))
file_name = file_name.replace("\\", "/")
for _ in range(
Props.trying_remove_maximum_times):
collect_receiver = CollectingOutputReceiver()
file_name = check_path_legal(file_name)
device.execute_shell_command("rm -rf {}".format(
file_name), receiver=collect_receiver,
output_flag=False)
if not collect_receiver.output:
LOG.debug(
"Removed file {} successfully".format(file_name))
break
else:
LOG.error("Removed file {} successfully".
format(collect_receiver.output))
else:
LOG.error("Failed to remove file {}".format(file_name))
def __add_pushed_file__(self, device, src, dst):
if device.is_directory(dst):
dst = dst + os.path.basename(src) if dst.endswith(
"/") else dst + "/" + os.path.basename(src)
self.pushed_file.append(dst)
def __add_dir_pushed_files__(self, device, src, dst):
if device.file_exist(device, dst):
for _, dirs, files in os.walk(src):
for file_path in files:
if dst.endswith("/"):
dst = "%s%s" % (dst, os.path.basename(file_path))
else:
dst = "%s/%s" % (dst, os.path.basename(file_path))
self.pushed_file.append(dst)
for dir_name in dirs:
self.__add_dir_pushed_files__(device, dir_name, dst)
else:
self.pushed_file.append(dst)
@Plugin(type=Plugin.TEST_KIT, id=CKit.propertycheck)
class PropertyCheckKit(ITestKit):
def __init__(self):
self.prop_name = ""
self.expected_value = ""
self.throw_error = ""
def __check_config__(self, config):
self.prop_name = get_config_value('property-name', config,
is_list=False)
self.expected_value = get_config_value('expected-value', config,
is_list=False)
self.throw_error = get_config_value('throw-error', config,
is_list=False)
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("PropertyCheckKit setup, device:{}".format(device.device_sn))
if not self.prop_name:
LOG.warning("The option of property-name not setting")
return
prop_value = device.get_property(self.prop_name)
if not prop_value:
LOG.warning(
"The property {} not found on device, cannot check the value".
format(self.prop_name))
return
if prop_value != self.expected_value:
msg = "The value found for property {} is {}, not same with the " \
"expected {}".format(self.prop_name, prop_value,
self.expected_value)
LOG.warning(msg)
if self.throw_error and self.throw_error.lower() == 'true':
raise Exception(msg)
@classmethod
def __teardown__(cls, device):
LOG.debug("PropertyCheckKit teardown: device:{}".format(
device.device_sn))
@Plugin(type=Plugin.TEST_KIT, id=CKit.shell)
class ShellKit(ITestKit):
def __init__(self):
self.command_list = []
self.tear_down_command = []
self.paths = None
def __check_config__(self, config):
self.command_list = get_config_value('run-command', config)
self.tear_down_command = get_config_value('teardown-command', config)
self.paths = get_config_value('paths', config)
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("ShellKit setup, device:{}".format(device.device_sn))
if len(self.command_list) == 0:
LOG.info("No setup_command to run, skipping!")
return
for command in self.command_list:
run_command(device, command)
def __teardown__(self, device):
LOG.debug("ShellKit teardown: device:{}".format(device.device_sn))
if len(self.tear_down_command) == 0:
LOG.info("No teardown_command to run, skipping!")
return
for command in self.tear_down_command:
run_command(device, command)
@Plugin(type=Plugin.TEST_KIT, id=CKit.wifi)
class WifiKit(ITestKit):
def __init__(self):
pass
def __check_config__(self, config):
self.certfilename = get_config_value(
'certfilename', config, False,
default=None)
self.certpassword = get_config_value(
'certpassword', config, False,
default=None)
self.wifiname = get_config_value(
'wifiname', config, False,
default=None)
def __setup__(self, device, **kwargs):
request = kwargs.get("request", None)
if not request:
LOG.error("WifiKit need input request")
return
testargs = request.get("testargs", {})
self.certfilename = \
testargs.pop("certfilename", [self.certfilename])[0]
self.wifiname = \
testargs.pop("wifiname", [self.wifiname])[0]
self.certpassword = \
testargs.pop("certpassword", [self.certpassword])[0]
del kwargs
LOG.debug("WifiKit setup, device:{}".format(device.device_sn))
try:
wifi_app_path = get_file_absolute_path(
Props.Paths.service_wifi_app_path)
except ParamError:
wifi_app_path = None
if wifi_app_path is None:
LOG.error("The resource wifi app file does not exist!")
return
try:
pfx_path = get_file_absolute_path(
"tools/wifi/%s" % self.certfilename
) if self.certfilename else None
except ParamError:
pfx_path = None
if pfx_path is None:
LOG.error("The resource wifi pfx file does not exist!")
return
pfx_dest_path = \
"/storage/emulated/0/%s" % self.certfilename
if self.wifiname is None:
LOG.error("The wifi name is not given!")
return
if self.certpassword is None:
LOG.error("The wifi password is not given!")
return
device.install_package(wifi_app_path, command="-r")
device.push_file(pfx_path, pfx_dest_path)
device.execute_shell_command("svc wifi enable")
for _ in range(Props.maximum_connect_wifi_times):
connect_wifi_cmd = Props.connect_wifi_cmd % (
pfx_dest_path,
self.certpassword,
self.wifiname
)
if device.execute_shell_command(connect_wifi_cmd):
LOG.info("Connect wifi successfully")
break
else:
LOG.error("Connect wifi failed")
@classmethod
def __teardown__(cls, device):
LOG.debug("WifiKit teardown: device:{}".format(device.device_sn))
LOG.info("Disconnect wifi")
device.execute_shell_command("svc wifi disable")
@dataclass
class Props:
@dataclass
class Paths:
system_build_prop_path = "/%s/%s" % ("system", "build.prop")
service_wifi_app_path = "tools/wifi/%s" % "Service-wifi.app"
dest_root = "/%s/%s/" % ("data", "data")
mnt_external_storage = "EXTERNAL_STORAGE"
trying_remove_maximum_times = 3
maximum_connect_wifi_times = 3
connect_wifi_cmd = "am instrument -e request \"{module:Wifi, " \
"method:connectWifiByCertificate, params:{'certPath':" \
"'%s'," \
"'certPassword':'%s'," \
"'wifiName':'%s'}}\" " \
"-w com.xdeviceservice.service/.MainInstrumentation"
security_patch = "ro.build.version.security_patch"
@Plugin(type=Plugin.TEST_KIT, id=CKit.config)
class ConfigKit(ITestKit):
def __init__(self):
self.is_connect_wifi = ""
self.is_disconnect_wifi = ""
self.wifi_kit = WifiKit()
self.min_external_store_space = ""
self.is_disable_dialing = ""
self.is_test_harness = ""
self.is_audio_silent = ""
self.is_disable_dalvik_verifier = ""
self.build_prop_list = ""
self.is_enable_hook = ""
self.cust_prop_file = ""
self.is_prop_changed = False
self.local_system_prop_file = ""
self.cust_props = ""
self.is_reboot_delay = ""
self.is_remount = ""
self.local_cust_prop_file = {}
def __check_config__(self, config):
self.is_connect_wifi = get_config_value('connect-wifi', config,
is_list=False, default=False)
self.is_disconnect_wifi = get_config_value(
'disconnect-wifi-after-test', config, is_list=False, default=True)
self.wifi_kit = WifiKit()
self.min_external_store_space = get_config_value(
'min-external-store-space', config)
self.is_disable_dialing = get_config_value('disable-dialing', config)
self.is_test_harness = get_config_value('set-test-harness', config)
self.is_audio_silent = get_config_value('audio-silent', config)
self.is_disable_dalvik_verifier = get_config_value(
'disable-dalvik-verifier', config)
self.build_prop_list = get_config_value('build-prop', config)
self.cust_prop_file = get_config_value('cust-prop-file', config)
self.cust_props = get_config_value('cust-prop', config)
self.is_enable_hook = get_config_value('enable-hook', config)
self.is_reboot_delay = get_config_value('reboot-delay', config)
self.is_remount = get_config_value('remount', config, default=True)
self.local_system_prop_file = NamedTemporaryFile(prefix='build',
suffix='.prop',
delete=False).name
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("ConfigKit setup, device:{}".format(device.device_sn))
if self.is_remount:
remount(device)
self.is_prop_changed = self.modify_system_prop(device)
self.is_prop_changed = self.modify_cust_prop(
device) or self.is_prop_changed
keep_screen_on(device)
if self.is_enable_hook:
pass
if self.is_prop_changed:
device.reboot()
def __teardown__(self, device):
LOG.debug("ConfigKit teardown: device:{}".format(device.device_sn))
if self.is_remount:
remount(device)
if self.is_connect_wifi and self.is_disconnect_wifi:
self.wifi_kit.__teardown__(device)
if self.is_prop_changed:
device.push_file(self.local_system_prop_file,
Props.Paths.system_build_prop_path)
device.execute_shell_command(
" ".join(["chmod 644", Props.Paths.system_build_prop_path]))
os.remove(self.local_system_prop_file)
for target_file, temp_file in self.local_cust_prop_file.items():
device.push_file(temp_file, target_file)
device.execute_shell_command(
" ".join(["chmod 644", target_file]))
os.remove(temp_file)
def modify_system_prop(self, device):
prop_changed = False
new_props = {}
if self.is_disable_dialing:
new_props['ro.telephony.disable-call'] = 'true'
if self.is_test_harness:
new_props['ro.monkey'] = '1'
new_props['ro.test_harness'] = '1'
if self.is_audio_silent:
new_props['ro.audio.silent'] = '1'
if self.is_disable_dalvik_verifier:
new_props['dalvik.vm.dexopt-flags'] = 'v=n'
for prop in self.build_prop_list:
if prop is None or prop.find("=") < 0 or len(prop.split("=")) != 2:
LOG.warning("The build prop:{} not match the format "
"'key=value'".format(prop))
continue
new_props[prop.split("=")[0]] = prop.split("=")[1]
if new_props:
prop_changed = modify_props(device, self.local_system_prop_file,
Props.Paths.system_build_prop_path,
new_props)
return prop_changed
def modify_cust_prop(self, device):
prop_changed = False
cust_files = {}
new_props = {}
for cust_prop_file in self.cust_prop_file:
# the correct format should be "CustName:/cust/prop/absolutepath"
if len(cust_prop_file.split(":")) != 2:
LOG.error(
"The value %s of option cust-prop-file is incorrect" %
cust_prop_file)
continue
cust_files[cust_prop_file.split(":")[0]] = \
cust_prop_file.split(":")[1]
for prop in self.cust_props:
# the correct format should be "CustName:key=value"
prop_infos = re.split(r'[:|=]', prop)
if len(prop_infos) != 3:
LOG.error(
"The value {} of option cust-prop is incorrect".format(
prop))
continue
file_name, key, value = prop_infos
if file_name not in cust_files:
LOG.error(
"The custName {} must be in cust-prop-file option".format(
file_name))
continue
props = new_props.setdefault(file_name, {})
props[key] = value
for name in new_props.keys():
cust_file = cust_files.get(name)
temp_cust_file = NamedTemporaryFile(prefix='cust', suffix='.prop',
delete=False).name
self.local_cust_prop_file[cust_file] = temp_cust_file
prop_changed = modify_props(device, temp_cust_file, cust_file,
new_props[name]) or prop_changed
return prop_changed
@Plugin(type=Plugin.TEST_KIT, id=CKit.appinstall)
class AppInstallKit(ITestKit):
def __init__(self):
self.app_list = ""
self.is_clean = ""
self.alt_dir = ""
self.ex_args = ""
self.installed_app = []
self.paths = ""
self.is_pri_app = ""
self.pushed_hap_file = []
def __check_config__(self, options):
self.app_list = get_config_value('test-file-name', options)
self.is_clean = get_config_value('cleanup-apps', options, False)
self.alt_dir = get_config_value('alt-dir', options, False)
if self.alt_dir and self.alt_dir.startswith("resource/"):
self.alt_dir = self.alt_dir[len("resource/"):]
self.ex_args = get_config_value('install-arg', options)
self.installed_app = []
self.paths = get_config_value('paths', options)
self.is_pri_app = get_config_value('install-as-privapp', options,
False, default=False)
def __setup__(self, device, **kwargs):
del kwargs
LOG.debug("AppInstallKit setup, device:{}".format(device.device_sn))
if len(self.app_list) == 0:
LOG.info("No app to install, skipping!")
return
for app in self.app_list:
if self.alt_dir:
app_file = get_file_absolute_path(app, self.paths,
self.alt_dir)
else:
app_file = get_file_absolute_path(app, self.paths)
if app_file is None:
LOG.error("The app file {} does not exist".format(app))
continue
if app_file.endswith(".hap"):
self.install_hap(device, app_file)
else:
result = device.install_package(
app_file, get_install_args(
device, app_file, self.ex_args))
if not result.startswith("Success"):
raise AppInstallError(
"Failed to install %s on %s. Reason:%s" %
(app_file, device.__get_serial__(), result))
self.installed_app.append(app_file)
def __teardown__(self, device):
LOG.debug("AppInstallKit teardown: device:{}".format(device.device_sn))
if self.is_clean and str(self.is_clean).lower() == "true":
for app in self.installed_app:
app_name = get_app_name(app)
if app_name:
result = device.uninstall_package(app_name)
if not result.startswith("Success"):
LOG.warning("error uninstalling package %s %s" %
(device.__get_serial__(), result))
else:
LOG.warning("Can't find app_name for %s" % app)
if self.is_pri_app:
remount(device)
for pushed_file in self.pushed_hap_file:
device.execute_shell_command("rm -r %s" % pushed_file)
def install_hap(self, device, hap_file):
if self.is_pri_app:
LOG.info("Install hap as privileged app {}".format(hap_file))
hap_name = os.path.basename(hap_file).replace(".hap", "")
try:
with TemporaryDirectory(prefix=hap_name) as temp_dir:
zif_file = zipfile.ZipFile(hap_file)
zif_file.extractall(path=temp_dir)
entry_app = os.path.join(temp_dir, "Entry.app")
push_dest_dir = os.path.join("/system/priv-app/", hap_name)
device.execute_shell_command("rm -rf " + push_dest_dir,
output_flag=False)
device.push_file(entry_app, os.path.join(
push_dest_dir + os.path.basename(entry_app)))
device.push_file(hap_file, os.path.join(
push_dest_dir + os.path.basename(hap_file)))
self.pushed_hap_file.append(os.path.join(
push_dest_dir + os.path.basename(hap_file)))
device.reboot()
except RuntimeError as exception:
msg = "Install hap app failed with error {}".format(exception)
LOG.error(msg)
raise Exception(msg)
except Exception as exception:
msg = "Install hap app failed with exception {}".format(
exception)
LOG.error(msg)
raise Exception(msg)
finally:
zif_file.close()
else:
# push_dest = "/%s" % "sdcard"
# push_dest = "%s/%s" % (push_dest, os.path.basename(hap_file))
# device.push_file(hap_file, push_dest)
# self.pushed_hap_file.append(push_dest)
# output = device.execute_shell_command("bm install -p " + push_dest)
output = device.hdc_command("install %s" % hap_file)
# if not output.startswith("Success"):
# output = output.strip()
# if "[ERROR_GET_BUNDLE_INSTALLER_FAILED]" not in output.upper():
# raise AppInstallError(
# "Failed to install %s on %s. Reason:%s" %
# (push_dest, device.__get_serial__(), output))
# else:
# LOG.info("'[ERROR_GET_BUNDLE_INSTALLER_FAILED]' occurs, "
# "retry install hap")
# exec_out = self.retry_install_hap(
# device, "bm install -p " + push_dest)
# if not exec_out.startswith("Success"):
# raise AppInstallError(
# "Retry failed,Can't install %s on %s. Reason:%s" %
# (push_dest, device.__get_serial__(), exec_out))
# else:
LOG.debug("Install %s, and output = %s" % (hap_file, output))
@classmethod
def retry_install_hap(cls, device, command):
if device.usb_type == DeviceConnectorType.hdc:
# hdc -t UID -s tcp:IP:PORT
real_command = ["hdc", "-t", str(device.device_sn), "-s",
"tcp:%s:%s" % (str(device.host), str(device.port)),
"shell", command]
message = "%s execute command: %s" % \
(convert_serial(device.device_sn), " ".join(real_command))
LOG.info(message)
exec_out = ""
for wait_count in range(1, 4):
LOG.debug("retry times:%s, wait %ss" %
(wait_count, (wait_count * 10)))
time.sleep(wait_count * 10)
exec_out = exec_cmd(real_command)
if exec_out and exec_out.startswith("Success"):
break
if not exec_out:
exec_out = "System is not in %s" % ["Windows", "Linux", "Darwin"]
LOG.info("retry install hap result is: [%s]" % exec_out.strip())
return exec_out
def remount(device):
device.enable_hdc_root()
cmd = "target mount" \
if device.usb_type == DeviceConnectorType.hdc else "remount"
device.hdc_command(cmd)
device.execute_shell_command("remount")
device.execute_shell_command("mount -o rw,remount /cust")
device.execute_shell_command("mount -o rw,remount /product")
device.execute_shell_command("mount -o rw,remount /hw_product")
device.execute_shell_command("mount -o rw,remount /version")
device.execute_shell_command("mount -o rw,remount /%s" % "system")
def keep_screen_on(device):
device.execute_shell_command("svc power stayon true")
def get_install_args(device, app_name, original_args=None):
"""To obtain all the args of app install
Args:
original_args: the argus configure in .config file
device : the device will be installed app
app_name : the name of the app which will be installed
Returns:
All the args
"""
if original_args is None:
original_args = []
new_args = original_args[:]
try:
sdk_version = device.get_property("ro.build.version.sdk")
if int(sdk_version) > 22:
new_args.append("-g")
except TypeError as type_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
type_error))
except ValueError as value_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
value_error))
if app_name.endswith(".apex"):
new_args.append("--apex")
return " ".join(new_args)
def run_command(device, command):
LOG.debug("The command:{} is running".format(command))
stdout = None
if command.strip() == "remount":
remount(device)
elif command.strip() == "reboot":
device.reboot()
elif command.strip() == "reboot-delay":
pass
else:
stdout = device.execute_shell_command(command)
LOG.debug("Run command result: %s" % (stdout if stdout else ""))
return stdout
def junit_para_parse(device, junit_paras, prefix_char="-e"):
"""To parse the para of junit
Args:
device: the device running
junit_paras: the para dict of junit
prefix_char: the prefix char of parsed cmd
Returns:
the new para using in a command like -e testFile xxx
-e coverage true...
"""
ret_str = []
path = "/%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")
include_file = "%s/%s" % (path, "includes.txt")
exclude_file = "%s/%s" % (path, "excludes.txt")
if not isinstance(junit_paras, dict):
LOG.warning("The para of junit is not the dict format as required")
return ""
# Disable screen keyguard
disable_key_guard = junit_paras.get('disable-keyguard')
if not disable_key_guard or disable_key_guard[0].lower() != 'false':
from xdevice_extension._core.driver.drivers import disable_keyguard
disable_keyguard(device)
for para_name in junit_paras.keys():
path = "/%s/%s/%s/%s/" % ("data", "local", "tmp", "ajur")
if para_name.strip() == 'test-file-include-filter':
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(" ".join([prefix_char, 'testFile', include_file]))
elif para_name.strip() == "test-file-exclude-filter":
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(" ".join([prefix_char, 'notTestFile',
exclude_file]))
elif para_name.strip() == "test" or para_name.strip() == "class":
result = _get_class(junit_paras, prefix_char, para_name.strip())
ret_str.append(result)
elif para_name.strip() == "include-annotation":
ret_str.append(" ".join([prefix_char, "annotation",
",".join(junit_paras[para_name])]))
elif para_name.strip() == "exclude-annotation":
ret_str.append(" ".join([prefix_char, "notAnnotation",
",".join(junit_paras[para_name])]))
else:
ret_str.append(" ".join([prefix_char, para_name,
",".join(junit_paras[para_name])]))
return " ".join(ret_str)
def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
"""To parse the para of junit
Args:
device: the device running
junit_paras: the para dict of junit
prefix_char: the prefix char of parsed cmd
Returns:
the new para using in a command like -e testFile xxx
-e coverage true...
"""
ret_str = []
path = "/%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")
include_file = "%s/%s" % (path, "includes.txt")
exclude_file = "%s/%s" % (path, "excludes.txt")
if not isinstance(junit_paras, dict):
LOG.warning("The para of junit is not the dict format as required")
return ""
# Disable screen keyguard
disable_key_guard = junit_paras.get('disable-keyguard')
if not disable_key_guard or disable_key_guard[0].lower() != 'false':
from xdevice_extension._core.driver.drivers import disable_keyguard
disable_keyguard(device)
for para_name in junit_paras.keys():
path = "/%s/%s/%s/%s/" % ("data", "local", "tmp", "ajur")
if para_name.strip() == 'test-file-include-filter':
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(prefix_char + " ".join(['testFile', include_file]))
elif para_name.strip() == "test-file-exclude-filter":
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(prefix_char + " ".join(['notTestFile',
exclude_file]))
elif para_name.strip() == "test" or para_name.strip() == "class":
result = _get_class(junit_paras, prefix_char, para_name.strip())
ret_str.append(result)
elif para_name.strip() == "include-annotation":
ret_str.append(prefix_char + " ".join(
['annotation', ",".join(junit_paras[para_name])]))
elif para_name.strip() == "exclude-annotation":
ret_str.append(prefix_char + " ".join(
['notAnnotation', ",".join(junit_paras[para_name])]))
else:
ret_str.append(prefix_char + " ".join(
[para_name, ",".join(junit_paras[para_name])]))
return " ".join(ret_str)
def timeout_callback(proc):
try:
LOG.error("Error: execute command timeout.")
LOG.error(proc.pid)
if platform.system() != "Windows":
os.killpg(proc.pid, signal.SIGKILL)
else:
subprocess.call(
["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
str(proc.pid)], shell=False)
except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
LOG.exception("timeout callback exception: %s" % error, exc_info=False)
def _get_class(junit_paras, prefix_char, para_name):
if not junit_paras.get(para_name):
return ""
result = ""
if prefix_char == "-e":
result = " %s class " % prefix_char
elif prefix_char == "--":
result = " %sclass " % prefix_char
elif prefix_char == "-s":
result = " %s class " % prefix_char
test_items = []
for test in junit_paras.get(para_name):
test_item = test.split("#")
if len(test_item) == 1 or len(test_item) == 2:
test_item = "%s" % test
test_items.append(test_item)
elif len(test_item) == 3:
test_item = "%s#%s" % (test_item[1], test_item[2])
test_items.append(test_item)
else:
raise ParamError("The parameter %s %s is error" % (
prefix_char, para_name))
if not result:
LOG.debug("there is unsolved prefix char: %s ." % prefix_char)
return result + ",".join(test_items)
def gtest_para_parse(gtest_paras, runner):
"""To parse the para of gtest
Args:
gtest_paras: the para dict of gtest
Returns:
the new para using in gtest
"""
ret_str = []
if not isinstance(gtest_paras, dict):
LOG.warning("The para of gtest is not the dict format as required")
return ""
for para in gtest_paras.keys():
if para.strip() == 'test-file-include-filter':
case_list = []
files = gtest_paras.get(para)
for case_file in files:
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(case_file, flags, modes),
"r") as file_desc:
case_list.extend(file_desc.read().splitlines())
runner.add_instrumentation_arg("gtest_filter", ":".join(case_list))
return " ".join(ret_str)
def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None):
if not ignore_keys and not isinstance(ignore_keys, list):
ignore_keys = ["class", "test"]
lines = junit_para_str.split(prefix_char)
normal_lines = []
for line in lines:
line = line.strip()
if line:
items = line.split()
if items[0].strip() in ignore_keys:
continue
normal_lines.append("{} {}".format(prefix_char, line))
return " ".join(normal_lines)
def get_app_name(hap_app):
hap_name = os.path.basename(hap_app).replace(".hap", "")
app_name = ""
with TemporaryDirectory(prefix=hap_name) as temp_dir:
zif_file = zipfile.ZipFile(hap_app)
zif_file.extractall(path=temp_dir)
config_json_file = os.path.join(temp_dir, "config.json")
if not os.path.exists(config_json_file):
LOG.debug("config.json not in %s.hap" % hap_name)
else:
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(config_json_file, flags, modes),
"r") as file_desc:
attrs = json.loads(file_desc.read())
if "app" in attrs.keys() and \
"bundleName" in attrs.get("app", dict()).keys():
app_name = attrs["app"]["bundleName"]
LOG.info("obtain the app name {} from json "
"successfully".format(app_name))
else:
LOG.debug("'app' or 'bundleName' not "
"in %s.hap/config.json" % hap_name)
zif_file.close()
return app_name

View File

@ -0,0 +1,238 @@
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import platform
import subprocess
import signal
import stat
from tempfile import NamedTemporaryFile
from xdevice import ShellHandler
from xdevice import platform_logger
from xdevice import get_plugin
from xdevice import Plugin
LOG = platform_logger("Utils")
def get_filename_extension(file_path):
_, fullname = os.path.split(file_path)
filename, ext = os.path.splitext(fullname)
return filename, ext
def start_standing_subprocess(cmd, pipe=subprocess.PIPE, return_result=False):
"""Starts a non-blocking subprocess that is going to continue running after
this function returns.
A subprocess group is actually started by setting sid, so we can kill all
the processes spun out from the subprocess when stopping it. This is
necessary in case users pass in pipe commands.
Args:
cmd: Command to start the subprocess with.
pipe: pipe to get execution result
return_result: return execution result or not
Returns:
The subprocess that got started.
"""
sys_type = platform.system()
process = subprocess.Popen(cmd, stdout=pipe, shell=False,
preexec_fn=None if sys_type == "Windows"
else os.setsid)
if not return_result:
return process
else:
rev = process.stdout.read()
return rev.decode("utf-8").strip()
def stop_standing_subprocess(process):
"""Stops a subprocess started by start_standing_subprocess.
Catches and ignores the PermissionError which only happens on Macs.
Args:
process: Subprocess to terminate.
"""
try:
sys_type = platform.system()
signal_value = signal.SIGINT if sys_type == "Windows" \
else signal.SIGTERM
os.kill(process.pid, signal_value)
except (PermissionError, AttributeError, FileNotFoundError,
SystemError) as error:
LOG.error("stop standing subprocess error '%s'" % error)
def get_decode(stream):
if not isinstance(stream, str) and not isinstance(stream, bytes):
ret = str(stream)
else:
try:
ret = stream.decode("utf-8", errors="ignore")
except (ValueError, AttributeError, TypeError):
ret = str(stream)
return ret
def is_proc_running(pid, name=None):
if platform.system() == "Windows":
proc_sub = subprocess.Popen(["C:\\Windows\\System32\\tasklist"],
stdout=subprocess.PIPE,
shell=False)
proc = subprocess.Popen(["C:\\Windows\\System32\\findstr", "%s" % pid],
stdin=proc_sub.stdout,
stdout=subprocess.PIPE, shell=False)
else:
# /bin/ps -ef | /bin/grep -v grep | /bin/grep -w pid
proc_sub = subprocess.Popen(["/bin/ps", "-ef"],
stdout=subprocess.PIPE,
shell=False)
proc_v_sub = subprocess.Popen(["/bin/grep", "-v", "grep"],
stdin=proc_sub.stdout,
stdout=subprocess.PIPE,
shell=False)
proc = subprocess.Popen(["/bin/grep", "-w", "%s" % pid],
stdin=proc_v_sub.stdout,
stdout=subprocess.PIPE, shell=False)
(out, _) = proc.communicate()
out = get_decode(out).strip()
LOG.debug("check %s proc running output: %s", pid, out)
if out == "":
return False
else:
return True if name is None else out.find(name) != -1
def create_dir(path):
"""Creates a directory if it does not exist already.
Args:
path: The path of the directory to create.
"""
full_path = os.path.abspath(os.path.expanduser(path))
if not os.path.exists(full_path):
os.makedirs(full_path, exist_ok=True)
def modify_props(device, local_prop_file, target_prop_file, new_props):
"""To change the props if need
Args:
device: the device to modify props
local_prop_file : the local file to save the old props
target_prop_file : the target prop file to change
new_props : the new props
Returns:
True : prop file changed
False : prop file no need to change
"""
is_changed = False
device.pull_file(target_prop_file, local_prop_file)
old_props = {}
changed_prop_key = []
lines = []
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(local_prop_file, flags, modes), "r") as old_file:
lines = old_file.readlines()
if lines:
lines[-1] = lines[-1] + '\n'
for line in lines:
line = line.strip()
if not line.startswith("#") and line.find("=") > 0:
key_value = line.split("=")
if len(key_value) == 2:
old_props[line.split("=")[0]] = line.split("=")[1]
for key, value in new_props.items():
if key not in old_props.keys():
lines.append("".join([key, "=", value, '\n']))
is_changed = True
elif old_props.get(key) != value:
changed_prop_key.append(key)
is_changed = True
if is_changed:
local_temp_prop_file = NamedTemporaryFile(mode='w', prefix='build',
suffix='.tmp', delete=False)
for index, line in enumerate(lines):
if not line.startswith("#") and line.find("=") > 0:
key = line.split("=")[0]
if key in changed_prop_key:
lines[index] = "".join([key, "=", new_props[key], '\n'])
local_temp_prop_file.writelines(lines)
local_temp_prop_file.close()
device.push_file(local_temp_prop_file.name, target_prop_file)
device.execute_shell_command(" ".join(["chmod 644", target_prop_file]))
LOG.info("Changed the system property as required successfully")
os.remove(local_temp_prop_file.name)
return is_changed
def convert_ip(origin_ip):
addr = origin_ip.strip().split(".")
if len(addr) == 4:
return addr[0] + "." + '*'*len(addr[1]) + "." + '*'*len(addr[2]) + \
"." + addr[-1]
else:
return origin_ip
def convert_port(port):
_port = str(port)
if len(_port) >= 2:
return "{}{}{}".format(_port[0], "*" * (len(_port) - 2), _port[-1])
else:
return "*{}".format(_port[-1])
def convert_serial(serial):
if serial.startswith("local_"):
return "local_" + '*'*(len(serial)-6)
elif serial.startswith("remote_"):
return "remote_" + convert_ip(serial.split("_")[1])
else:
length = len(serial)//3
return serial[0:length] + "*"*(len(serial)-length*2) + serial[-length:]
def get_shell_handler(request, parser_type):
suite_name = request.root.source.test_name
parsers = get_plugin(Plugin.PARSER, parser_type)
if parsers:
parsers = parsers[:1]
parser_instances = []
for listener in request.listeners:
listener.device_sn = request.config.environment.devices[0].device_sn
for parser in parsers:
parser_instance = parser.__class__()
parser_instance.suite_name = suite_name
parser_instance.listeners = request.listeners
parser_instances.append(parser_instance)
handler = ShellHandler(parser_instances)
return handler
def check_path_legal(path):
if path and " " in path:
return "\"%s\"" % path
return path

0
figures/icon-caution.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 580 B

After

Width:  |  Height:  |  Size: 580 B

0
figures/icon-danger.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 580 B

After

Width:  |  Height:  |  Size: 580 B

0
figures/icon-note.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 394 B

After

Width:  |  Height:  |  Size: 394 B

0
figures/icon-notice.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 406 B

After

Width:  |  Height:  |  Size: 406 B

0
figures/icon-tip.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 253 B

After

Width:  |  Height:  |  Size: 253 B

0
figures/icon-warning.gif Executable file → Normal file
View File

Before

Width:  |  Height:  |  Size: 580 B

After

Width:  |  Height:  |  Size: 580 B

0
run.bat Executable file → Normal file
View File

11
run.sh Executable file → Normal file
View File

@ -1,6 +1,6 @@
#!/usr/bin/env sh
#!/bin/bash
#
# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
# Copyright (C) 2020-2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -14,6 +14,8 @@
# limitations under the License.
#
set -e
error()
{
echo "$1"
@ -29,10 +31,10 @@ fi
$PYTHON -c 'import sys; exit(1) if sys.version_info.major < 3 or sys.version_info.minor < 7 else exit(0)' || \
error "Python3.7 or higher version required!"
cd $(dirname "$0") || error "Failure to change direcory!"
cd $(dirname "$0") || error "Failure to change directory!"
$PYTHON -c "import easy_install" || error "Please install setuptools first!"
if [ ! -d $TOOLS ]; then
if [ ! -d "$TOOLS" ]; then
error "$TOOLS directory not exists"
fi
@ -53,3 +55,4 @@ do
done
$PYTHON -m xdevice "$@"
exit 0

2
setup.py Executable file → Normal file
View File

@ -25,7 +25,7 @@ def main():
setup(name='xdevice',
description='xdevice test framework',
url='',
package_dir={'': 'src', 'adapter': 'adapter'},
package_dir={'': 'src'},
packages=['xdevice',
'xdevice._core',
'xdevice._core.build',

0
src/xdevice/__main__.py Executable file → Normal file
View File

0
src/xdevice/_core/command/console.py Executable file → Normal file
View File

0
src/xdevice/_core/config/config_manager.py Executable file → Normal file
View File

0
src/xdevice/_core/config/resource_manager.py Executable file → Normal file
View File

0
src/xdevice/_core/constants.py Executable file → Normal file
View File

4
src/xdevice/_core/driver/device_test.py Executable file → Normal file
View File

@ -243,6 +243,10 @@ class DeviceTestDriver(IDriver):
if configs["testcases_path"]:
sys.path.insert(1, configs["testcases_path"])
# apply data to devicetest module about resource path
request = configs.get('request', None)
if request:
sys.ecotest_resource_path = request.config.resource_path
# run devicetest
from _devicetest.devicetest.main import DeviceTest
device_test = DeviceTest(test_list=test_list, configs=configs,

0
src/xdevice/_core/driver/drivers_lite.py Executable file → Normal file
View File

0
src/xdevice/_core/driver/parser_lite.py Executable file → Normal file
View File

5
src/xdevice/_core/environment/device_lite.py Executable file → Normal file
View File

@ -43,7 +43,8 @@ from _core.utils import check_mode
LOG = platform_logger("DeviceLite")
TIMEOUT = 90
RETRY_ATTEMPTS = 0
HDC = "litehdc.exe"
HDC = "hdc_std.exe"
DEFAULT_BAUD_RATE = 115200
def get_hdc_path():
@ -510,7 +511,7 @@ class ComController:
self.com = None
self.serial_port = device.get("com") if device.get("com") else None
self.baud_rate = int(device.get("baud_rate")) if device.get(
"baud_rate") else 115200
"baud_rate") else DEFAULT_BAUD_RATE
self.timeout = int(device.get("timeout")) if device.get(
"timeout") else TIMEOUT

20
src/xdevice/_core/environment/dmlib_lite.py Executable file → Normal file
View File

@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import time
import re
@ -25,6 +26,7 @@ from _core.exception import LiteDeviceTimeout
from _core.exception import LiteDeviceConnectError
from _core.exception import LiteDeviceExecuteCommandError
from _core.logger import platform_logger
from _core.utils import convert_port
__all__ = ["generate_report", "LiteHelper"]
@ -279,7 +281,7 @@ class LiteHelper:
error_no="00402")
LOG.info("local_%s execute command shell %s with timeout %ss" %
(com.port, command, str(timeout)))
(convert_port(com.port), command, str(timeout)))
if isinstance(command, str):
command = command.encode("utf-8")
@ -302,7 +304,8 @@ class LiteHelper:
error_no="00402")
LOG.info(
"local_%s execute command shell %s" % (com.port, command))
"local_%s execute command shell %s" % (convert_port(com.port),
command))
command = command.encode("utf-8")
if command[-2:] != b"\r\n":
command = command.rstrip() + b'\r\n'
@ -340,8 +343,9 @@ class LiteHelper:
elif "put" == command_type:
url = base_url + target_save_path + command
data = open(local_source_dir + command, "rb")
response = requests.put(url=url, headers=headers, data=data)
with open(local_source_dir + command, "rb") as data:
response = requests.put(url=url, headers=headers,
data=data)
if response.status_code == 200:
LOG.info("{} upload file to {} "
"success".format(local_source_dir,
@ -355,10 +359,10 @@ class LiteHelper:
url = base_url + target_save_path + command
response = requests.get(url=url, headers=headers, stream=True)
if response.status_code == 200:
file_name = open(local_source_dir + command + "bak", "wb")
for file_data in response.iter_content(chunk_size=512):
file_name.write(file_data)
file_name.close()
with open(local_source_dir + command + "bak", "wb") \
as file_name:
for file_data in response.iter_content(chunk_size=512):
file_name.write(file_data)
LOG.info("from {} download file to {} success".format(
target_save_path + command,
local_source_dir))

0
src/xdevice/_core/environment/manager_env.py Executable file → Normal file
View File

0
src/xdevice/_core/environment/manager_lite.py Executable file → Normal file
View File

0
src/xdevice/_core/exception.py Executable file → Normal file
View File

0
src/xdevice/_core/executor/concurrent.py Executable file → Normal file
View File

0
src/xdevice/_core/executor/listener.py Executable file → Normal file
View File

0
src/xdevice/_core/executor/request.py Executable file → Normal file
View File

0
src/xdevice/_core/executor/scheduler.py Executable file → Normal file
View File

2
src/xdevice/_core/executor/source.py Executable file → Normal file
View File

@ -383,6 +383,8 @@ def _get_test_type(config_file, test_driver, ext):
if ext in [".py", ".js", ".dex", ".hap", ".bin"] \
and ext in EXT_TYPE_DICT.keys():
test_type = EXT_TYPE_DICT[ext]
elif ext in EXT_TYPE_DICT.keys():
test_type = DeviceTestType.hap_test
else:
test_type = DeviceTestType.cpp_test
return test_type

0
src/xdevice/_core/interface.py Executable file → Normal file
View File

3
src/xdevice/_core/logger.py Executable file → Normal file
View File

@ -378,7 +378,8 @@ class EncryptFileHandler(RotatingFileHandler):
# baseFilename is the attribute in FileHandler
base_file_name = getattr(self, "baseFilename", None)
return open(base_file_name, self.mode)
with open(base_file_name, self.mode) as encrypt_file:
return encrypt_file
def emit(self, record):
try:

0
src/xdevice/_core/plugin.py Executable file → Normal file
View File

0
src/xdevice/_core/report/__main__.py Executable file → Normal file
View File

0
src/xdevice/_core/report/encrypt.py Executable file → Normal file
View File

View File

@ -551,8 +551,8 @@ class VisionHelper:
if not os.path.exists(self.template_name):
LOG.error("template file not exists")
return ""
file_context = open(self.template_name).read()
with open(self.template_name) as file_temp:
file_context = file_temp.read()
file_context = self._render_key("", ReportConstant.title_name,
title_name, file_context)
file_context = self._render_exec_info(file_context, exec_info)

0
src/xdevice/_core/report/suite_reporter.py Executable file → Normal file
View File

129
src/xdevice/_core/resource/config/user_config.xml Executable file → Normal file
View File

@ -1,66 +1,63 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<device type="com"
label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device type="com"
label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com"
label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir></dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
<loglevel>INFO</loglevel>
</user_config>
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright (c) 2020 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<user_config>
<environment>
<device type="com" label="wifiiot">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>20</timeout>
</serial>
<serial>
<com></com>
<type>deploy</type>
<baud_rate>115200</baud_rate>
</serial>
</device>
<device type="com" label="ipcamera">
<serial>
<com></com>
<type>cmd</type>
<baud_rate>115200</baud_rate>
<data_bits>8</data_bits>
<stop_bits>1</stop_bits>
<timeout>1</timeout>
</serial>
</device>
<device type="com" label="ipcamera">
<ip></ip>
<port></port>
</device>
</environment>
<testcases>
<dir></dir>
<server label="NfsServer">
<ip></ip>
<port></port>
<dir></dir>
<username></username>
<password></password>
<remote></remote>
</server>
</testcases>
<resource>
<dir></dir>
</resource>
<loglevel>INFO</loglevel>
</user_config>

View File

@ -26,7 +26,6 @@
}
div.logo {
background-image: url('');
width: 80px;
height: 30px;
}
@ -468,4 +467,4 @@
<!--{failures.context}-->
</div>
</body>
</html>
</html>

0
src/xdevice/_core/testkit/json_parser.py Executable file → Normal file
View File

13
src/xdevice/_core/testkit/kit_lite.py Executable file → Normal file
View File

@ -284,7 +284,7 @@ class MountKit(ITestKit):
for mount_file in self.mount_list:
source = mount_file.get("source")
if not source:
raise TypeError("The source of MountKit cant be empty "
raise TypeError("The source of MountKit can not be empty "
"in Test.json!")
source = source.replace("$testcases/", "").\
replace("$resources/", "")
@ -314,10 +314,10 @@ class MountKit(ITestKit):
if (str(get_local_ip()) == linux_host) and (
linux_directory == ("/data%s" % testcases_dir)):
return
ip = remote_info.get("ip", "")
remote_ip = remote_info.get("ip", "")
port = remote_info.get("port", "")
remote_dir = remote_info.get("dir", "")
if not ip or not port or not remote_dir:
if not remote_ip or not port or not remote_dir:
LOG.warning("nfs server's ip or port or dir is empty")
return
for _file in file_local_paths:
@ -327,7 +327,7 @@ class MountKit(ITestKit):
if not is_remote.lower() == "false":
try:
import paramiko
client = paramiko.Transport(ip, int(port))
client = paramiko.Transport(remote_ip, int(port))
client.connect(username=remote_info.get("username"),
password=remote_info.get("password"))
sftp = paramiko.SFTPClient.from_transport(client)
@ -383,9 +383,8 @@ class MountKit(ITestKit):
command="umount {}".format(mounted_dir),
timeout=2)
if result.find("Resource busy") == -1:
device.execute_command_with_timeout(command="rm -r {}".
format(mounted_dir)
, timeout=1)
device.execute_command_with_timeout(
command="rm -r {}".format(mounted_dir), timeout=1)
if status:
break
LOG.info("umount failed,try "

0
src/xdevice/_core/utils.py Executable file → Normal file
View File

4
src/xdevice/variables.py Executable file → Normal file
View File

@ -23,12 +23,16 @@ from dataclasses import dataclass
__all__ = ["Variables"]
SRC_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
SRC_ADAPTER_DIR = os.path.abspath(os.path.join(SRC_DIR, "adapter"))
MODULES_DIR = os.path.abspath(os.path.dirname(__file__))
TOP_DIR = os.path.abspath(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
TOP_ADAPTER_DIR = os.path.abspath(os.path.join(TOP_DIR, "adapter"))
sys.path.insert(0, SRC_DIR)
sys.path.insert(1, MODULES_DIR)
sys.path.insert(2, TOP_DIR)
sys.path.insert(3, SRC_ADAPTER_DIR)
sys.path.insert(4, TOP_ADAPTER_DIR)
@dataclass