developtools_hdc/scripts/hdc_test.py
WANGTAO 8357fa75b3 fix alarms
Signed-off-by: WANGTAO <wangtao577@huawei.com>
2024-03-27 10:14:51 +08:00

461 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1.运行环境
# pycharm
# Python 3.10
# 测试框架 pytest (pycharm setting -> Tools -> Testing -> pytest)
# 2.配置测试脚本
# 需配置hdc环境变量-NORMAL_HEAD为可执行程序hdc
# 配置TEST_FILE_PATH为测试用文件所在路径,取resource测试文件放入该路径下
# 测试tcp模式连接设备需设备PC连同一网络(手机热点)配置TCP_CFG['ip']为设备的ip
import os
import random
import subprocess
import socket
import time
import threading
TCP_CFG = {
'ip': '',
'port': "8710",
}
NORMAL_HEAD = "hdc "
REMOTE_PATH = "/data/local/tmp/"
IP = ""
TEST_FILE_PATH = "{}{}".format(os.path.abspath("D:/Download/sdk/"), '\\')
HAP_ONE = {
'HAP_NAME': "entry-default-signed-debug.hap",
'PACKAGE_NAME': "com.hmos.diagnosis",
}
TCP_CONN = {
'bright_screen': "shell \"power-shell setmode 602\"",
'tmode': "{}{}".format("tmode port ", TCP_CFG['port']),
'tconn': "{}{}{}{}".format("tconn ", TCP_CFG['ip'], ":", TCP_CFG['port']),
}
PATH = {
'file_send': {
'local': "{}{}".format(TEST_FILE_PATH, "hdc.log"),
'remote': "{}{}".format(REMOTE_PATH, "hdc.log")
},
'dir_send': {
'local': "{}{}".format(TEST_FILE_PATH, "log"),
'remote': "{}{}".format(REMOTE_PATH, "log")
},
'file_recv': {
'remote': "{}{}".format(REMOTE_PATH, "hdc.log"),
'local': "{}{}".format(TEST_FILE_PATH, "dev_data")
},
'dir_recv': {
'remote': "{}{}".format(REMOTE_PATH, "log"),
'local': "{}{}".format(TEST_FILE_PATH, "hdc\\log")
},
'file_empty': {
'local': "{}{}".format(TEST_FILE_PATH, "empty.txt"),
'remote': "{}{}".format(REMOTE_PATH, "empty.txt")
}
}
EXTRA_COMMANDS = {
'global': ["kill -r", "kill", "-l5 start", "start -r", "-v", "version", "checkserver"],
'smode': ["smode -r", "smode"],
'boot': ["target boot"],
'choose': "-t "
}
BASIC_COMMANDS = {
'shell': [
"shell \"ls\""
],
'component': [
"list targets", "list targets -v", "target mount"
],
'file_task': [
"{}{}{}{}".format("file send ", PATH['file_send']['local'], " ", PATH['file_send']['remote']),
"{}{}{}{}".format("file send ", PATH['file_empty']['local'], " ", PATH['file_empty']['remote']),
"{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_send']['remote']),
"{}{}{}{}".format("file recv ", PATH['file_recv']['remote'], " ", PATH['file_recv']['local']),
"{}{}{}{}".format("file recv ", PATH['file_empty']['remote'], " ", PATH['file_empty']['local']),
"{}{}{}{}".format("file recv ", PATH['dir_recv']['remote'], " ", PATH['dir_recv']['local'])
],
'fport_task': [
"fport tcp:1234 tcp:1080",
"rport tcp:13608 localabstract:8888BananaBanana",
"fport ls",
"fport rm tcp:1234 tcp:1080"
],
'install_task': [
"{}{}{}".format("install ", TEST_FILE_PATH, HAP_ONE['HAP_NAME']),
"{}{}".format("uninstall ", HAP_ONE['PACKAGE_NAME'])
]
}
TEST_FILES = {
'one': {
'send_file': "{}{}".format(TEST_FILE_PATH, "100M.txt"),
'send_file_one': "{}{}".format(REMOTE_PATH, "a100M.txt"),
'send_file_two': "{}{}".format(REMOTE_PATH, "c100M.txt"),
'recv_file': "{}{}".format(REMOTE_PATH, "recv100M.txt"),
'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv100M.txt"),
'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv200M.txt"),
},
'two': {
'send_file': "{}{}".format(TEST_FILE_PATH, "hdc_file.log"),
'send_file_one': "{}{}".format(REMOTE_PATH, "send_one.log"),
'send_file_two': "{}{}".format(REMOTE_PATH, "send_two.log"),
'recv_file': "{}{}".format(REMOTE_PATH, "hdcd.log"),
'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv_one.log"),
'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv_two.log"),
}
}
def command_judge(cmd):
ret = False
cmd_parts = cmd.split()
if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
ret = True
if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
ret = True
if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
ret = True
return ret
def command_callback(cmd, head, need_del, res=""):
cmd_parts = cmd.split()
if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
if need_del:
assert "FileTransfer finish" in res
check_file_send(cmd_parts[2], cmd_parts[3], head, need_del)
if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
if need_del:
assert "FileTransfer finish" in res
check_file_recv(cmd_parts[2], cmd_parts[3], head, need_del)
if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
check_install(head, res)
if cmd == 'smode':
time.sleep(4)
check_root(head)
if cmd == 'smode -r':
time.sleep(4)
check_user(head)
if cmd == 'target boot':
time.sleep(35)
def check_file_send(local_file, remote_file, head, need_del):
ret = get_win_file_type(local_file)
file_type = ""
if 'file' in ret:
file_type = '-f'
if "dir" in ret:
file_type = '-d'
res = run_command_stdout("{}{}{}{}{}".format("shell \"[ ", file_type, " ", remote_file,
" ] && echo yes || echo no\""), head)
assert 'yes' in res
if file_type == '-d' or 'empty.txt' in local_file:
rm_send_file(remote_file, head, need_del)
return
local_md5 = get_win_md5(local_file)
remote_md5 = get_md5(remote_file, head)
rm_send_file(remote_file, head, need_del)
assert local_md5 == remote_md5
print("check_file_send success ", res)
def rm_send_file(file_path, head, need_del):
if need_del:
run_command("{}{}{}{}".format("shell \"rm -rf ", file_path, "\"", head))
def check_file_recv(remote_file, local_file, head, need_del):
if 'dir' in get_win_file_type(local_file) or 'empty.txt' in local_file:
rm_recv_file(local_file, need_del)
return
res = run_command_stdout("{}{}{}".format("attrib ", local_file, ""))
local_md5 = get_win_md5(local_file)
remote_md5 = get_md5(remote_file, head)
assert '-' not in res
if local_md5 != remote_md5:
print("check_file_recv fail ", remote_file, local_file)
assert local_md5 == remote_md5
rm_recv_file(local_file, need_del)
print("check_file_recv success ", res)
def get_win_file_type(file_path):
ret = run_command_stdout("{}{}{}".format("if exist ", file_path, " echo yes"), '')
assert "yes" in ret
res = run_command_stdout("{}{}{}".format("dir/ad ", file_path, " >nul 2>nul && echo dir || echo file"), '')
return res
def rm_recv_file(file_path, need_del):
if need_del:
res = get_win_file_type(file_path)
if "dir" in res:
run_command("{}{}".format("rmdir /s/q ", file_path), "")
if "file" in res:
run_command("{}{}".format("del ", file_path), "")
def check_install(head, res):
print("check_install")
print(res)
if "msg:install bundle successfully." not in res:
print("install msg error")
assert False
res = run_command_stdout("shell \"bm dump -a\"", head)
if HAP_ONE['PACKAGE_NAME'] in res:
print("check_install success ", HAP_ONE['PACKAGE_NAME'])
assert True
else:
assert False
def check_root(head):
res = run_command_stdout("shell \"whoami\"", head)
print("check_root res: ", res)
assert 'root' in res
def check_user(head):
res = run_command_stdout("shell \"whoami\"", head)
print("check_user res: ", res)
assert 'shell' in res
return 'shell' in res
def get_devs(head=NORMAL_HEAD):
res = run_command_stdout(BASIC_COMMANDS['component'][0], head)
devs = res.split()
print(devs)
return devs
def tmode_to_tcp():
run_command(TCP_CONN['tmode'])
res = run_command_stdout(TCP_CONN['tconn'])
print(res)
if "Connect OK" in res:
return True
return False
def remote_server_start(server_head):
global IP
cmd = "{}{}".format(server_head, "-m")
print(cmd)
os.popen(cmd)
def run_command(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
command = "{}{}".format(head, cmd)
if head != '':
print(command)
subprocess.Popen(command,
shell=True).communicate()
if need_callback:
command_callback(cmd, head, need_del)
def run_command_stdout(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
command = "{}{}".format(head, cmd)
if head != '' and 'echo' not in cmd:
print(command)
dec = "UTF-8"
if head == '':
dec = 'gbk'
res = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE).communicate()
res = res[0].decode(dec)
if need_callback:
command_callback(cmd, head, need_del, res)
return res
def run_commands(cmds, head=NORMAL_HEAD, need_del=True):
for command in cmds:
run_command(command, head, need_del)
def get_basic_commands():
commands = []
for tasks in BASIC_COMMANDS.values():
commands += tasks
return commands
def run_global_cmd():
global_commands = EXTRA_COMMANDS['global']
run_commands(global_commands)
def run_split_commands(commands, head=NORMAL_HEAD):
for command in commands:
if command_judge(command):
run_command_stdout(command, head, False)
else:
run_command(command, head, False)
def run_device_cmd(head=NORMAL_HEAD):
run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_recv']['remote']),
head, False)
for smd in EXTRA_COMMANDS['smode']:
run_command(smd)
commands = get_basic_commands() + EXTRA_COMMANDS['boot']
if smd == "smode -r":
for item in commands:
if "{}{}".format(REMOTE_PATH, "log") in item:
print(item)
commands.remove(item)
run_split_commands(commands, head)
def extract_ip():
global IP
st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
st.connect(('10.255.255.255', 1))
IP = st.getsockname()[0]
except Exception:
IP = ""
finally:
st.close()
return IP
def mix_path(path, i):
ret = path.find('.')
if ret > 0:
return "{}{}{}".format(path[:ret], i, path[ret:])
return path
def file_send(send_path, recv_path, i, wait_time=0):
time.sleep(wait_time)
res = run_command_stdout("{}{}{}{}".format("file send ", send_path, " ", mix_path(recv_path, str(i))))
print(res)
def file_recv(remote_path, recv_path, i, wait_time=0):
time.sleep(wait_time)
res = run_command_stdout("{}{}{}{}".format("file recv ", remote_path, " ", mix_path(recv_path, str(i))))
print(res)
def get_win_md5(file_path):
real_md5 = "win_md5"
send_md5 = run_command_stdout("{}{}{}".format("certutil -hashfile ", os.path.abspath(file_path), " MD5"),
"").split()
for x in send_md5:
if len(x) == 32:
real_md5 = x
return real_md5
def get_md5(file_path, head=NORMAL_HEAD):
md5 = run_command_stdout("{}{}{}".format("shell \"md5sum ", file_path, "\""), head)
return md5.split()[0]
class TestCommands:
def test_file_cmd(self):
print("HDC TEST: start test_file_cmd\n")
for item in TEST_FILES:
send_file = TEST_FILES[item]['send_file']
send_file_one = TEST_FILES[item]['send_file_one']
send_file_two = TEST_FILES[item]['send_file_two']
recv_file = TEST_FILES[item]['recv_file']
recv_file_one = TEST_FILES[item]['recv_file_one']
recv_file_two = TEST_FILES[item]['recv_file_two']
run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(PATH['file_empty']['local']), ' ',
PATH['file_empty']['remote']), NORMAL_HEAD, False, False)
run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(send_file), ' ', recv_file),
NORMAL_HEAD, False)
for i in range(10):
wait_time = random.uniform(0, 1)
if i == 0:
wait_time = 0
print("{}{}".format("HDC TEST: start test_file_cmd \n", str(i)))
send_one = threading.Thread(target=file_send, args=(os.path.abspath(send_file), send_file_one, i))
send_two = threading.Thread(target=file_send,
args=(os.path.abspath(send_file), send_file_two, i, wait_time))
recv_one = threading.Thread(target=file_recv, args=(recv_file, os.path.abspath(recv_file_one), i))
recv_two = threading.Thread(target=file_recv,
args=(recv_file, os.path.abspath(recv_file_two), i, wait_time))
send_one.start()
send_two.start()
recv_one.start()
recv_two.start()
send_one.join()
send_two.join()
recv_one.join()
recv_two.join()
def test_global_server(self):
print("HDC TEST: start test_global_server_cmd\n")
run_global_cmd()
def test_device_cmd(self):
print("HDC TEST: start test_device_cmd\n")
devs = get_devs()
if len(devs) == 1:
run_device_cmd()
if len(devs) > 1:
for dev in devs:
run_device_cmd("{}{}{}{}".format(NORMAL_HEAD, EXTRA_COMMANDS['choose'], dev, " "))
def test_tcp_mode(self):
print("HDC TEST: start test_tcp_mode\n")
extract_ip()
global IP
if len(IP) == 0 or TCP_CFG['ip'] == '':
print("请连接热点 配置TCP_CFG")
return
if tmode_to_tcp():
commands = get_basic_commands()
run_commands(commands, NORMAL_HEAD, False)
def test_remote_server(self):
time.sleep(10)
print("HDC TEST: start test_remote_server\n")
extract_ip()
ret = run_command_stdout("kill")
if 'finish' not in ret:
print('test_remote_server kill server failed')
return
global IP
server_head = "{}{}{}{}{}".format(NORMAL_HEAD, "-s ", IP, ":", "8710 ")
thread_start = threading.Thread(target=remote_server_start(server_head))
thread_start.start()
thread_start.join()
devs = get_devs(server_head)
for dev in devs:
head = "{}{}{}{}".format(server_head, EXTRA_COMMANDS['choose'], dev, " ")
run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ",
PATH['dir_recv']['remote']), head, False)
threading.Thread(target=run_split_commands(get_basic_commands(), head)).start()
run_command("kill", server_head)