!1422 【UT】add test shell print huge size message.

Merge pull request !1422 from liurantao/master
This commit is contained in:
openharmony_ci 2024-08-06 10:07:10 +00:00 committed by Gitee
commit cef57f2d44
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 87 additions and 37 deletions

View File

@ -232,7 +232,7 @@ class GP(object):
@classmethod
def get_version(cls):
version = f"v1.0.3a"
version = f"v1.0.4a"
return version
@ -278,12 +278,13 @@ def rmdir(path):
try:
if sys.platform == "win32":
if os.path.isfile(path):
subprocess.call(f"del {path}".split())
os.remove(path)
else:
subprocess.call(f"del /Q {path}".split())
shutil.rmtree(path)
else:
subprocess.call(f"rm -rf {path}".split())
except OSError:
print(f"Error: {path} : cannot remove")
pass
@ -295,7 +296,7 @@ def get_remote_path(path):
return f"{GP.remote_path}/{path}"
def _get_local_md5(local):
def get_local_md5(local):
md5_hash = hashlib.md5()
with open(local, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
@ -332,9 +333,12 @@ def check_rate(cmd, expected_rate):
return rate > expected_rate
def _check_dir(local, remote):
def _get_md5sum(remote):
cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
def check_dir(local, remote, is_single_dir=False):
def _get_md5sum(remote, is_single_dir=False):
if is_single_dir:
cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
else:
cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
result = subprocess.check_output(cmd.split()).decode()
return result
@ -349,7 +353,7 @@ def _check_dir(local, remote):
return "PermissionError"
except FileNotFoundError:
return "FileNotFoundError"
print("remote" + remote)
print("remote:" + remote)
output = _get_md5sum(remote)
print(output)
@ -358,11 +362,15 @@ def _check_dir(local, remote):
if len(line) < 32: # length of MD5
continue
expected_md5, file_name = line.split()[:2]
if GP.remote_path in remote:
if is_single_dir:
file_name = file_name.replace(f"{remote}", "")
elif GP.remote_path in remote:
file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
else:
file_name = file_name.split(remote)[1].replace("/", "\\")
file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径
if is_single_dir:
file_path = os.path.join(os.getcwd(), local) + file_name
print(file_path)
actual_md5 = _calculate_md5(file_path)
print(f"Expected: {expected_md5}")
@ -379,12 +387,12 @@ def _check_file(local, remote):
if remote.startswith("/proc"):
local_size = os.path.getsize(local)
if local_size > 0:
return true
return True
else:
return false
return False
else:
cmd = f"shell md5sum {remote}"
local_md5 = _get_local_md5(local)
local_md5 = get_local_md5(local)
return check_shell(cmd, local_md5)
@ -473,7 +481,7 @@ def check_hdc_cmd(cmd, pattern=None, **args):
if os.path.isfile(local):
return _check_file(local, remote)
else:
return _check_dir(local, remote)
return check_dir(local, remote)
elif cmd.startswith("install"):
bundle = args.get("bundle", "invalid")
opt = " ".join(cmd.split()[1:-1])
@ -556,17 +564,19 @@ def gen_file(path, size):
os.close(fd)
def gen_zero_file(path, size):
fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
os.write(fd, b'0' * size)
os.close(fd)
def create_file_with_size(path, size):
fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
os.write(fd, b'\0' * size)
os.close(fd)
def prepare_source():
print(f"in prepare {GP.local_path},wait for 2 mins.")
current_path = os.getcwd()
os.mkdir(GP.local_path)
def gen_file_set():
print("generating empty file ...")
gen_file(os.path.join(GP.local_path, "empty"), 0)
@ -580,20 +590,29 @@ def prepare_source():
gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
print("generating soft link ...")
os.symlink("small", os.path.join(GP.local_path, "soft_small"))
try:
os.symlink("small", os.path.join(GP.local_path, "soft_small"))
except FileExistsError:
print("soft_small already exists")
print("generating text file ...")
gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
print("generating package dir ...")
os.mkdir(os.path.join(GP.local_path, "package"))
if not os.path.exists(os.path.join(GP.local_path, "package")):
os.mkdir(os.path.join(GP.local_path, "package"))
for i in range(1, 6):
gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
print("generating deep dir ...")
deepth = 4
deep_path = os.path.join(GP.local_path, "deep_dir")
os.mkdir(deep_path)
if not os.path.exists(deep_path):
os.mkdir(deep_path)
for deep in range(deepth):
deep_path = os.path.join(deep_path, f"deep_dir{deep}")
os.mkdir(deep_path)
if not os.path.exists(deep_path):
os.mkdir(deep_path)
gen_file(os.path.join(deep_path, "deep"), 102400)
print("generating dir with file ...")
@ -617,6 +636,29 @@ def prepare_source():
rmdir(dir_path)
os.mkdir(dir_path)
print("generating version file ...")
gen_file(os.path.join(GP.local_path, GP.get_version()), 0)
def prepare_source():
if os.path.exists(os.path.join(GP.local_path, GP.get_version())):
print(f"hdc test version is {GP.get_version}, check ok, skip prepare.")
return
print(f"in prepare {GP.local_path},wait for 2 mins.")
current_path = os.getcwd()
if os.path.exists(GP.local_path):
#打开local_path遍历其中的文件删除hap hsp以外的所有文件
for file in os.listdir(GP.local_path):
if file.endswith(".hap") or file.endswith(".hsp"):
continue
file_path = os.path.join(GP.local_path, file)
rmdir(file_path)
else:
os.mkdir(GP.local_path)
gen_file_set()
def add_prepare_source():
deep_path = os.path.join(GP.local_path, "deep_test_dir")
@ -802,14 +844,14 @@ def make_multiprocess_file(local, remote, mode, num, task_type):
for _ in range(num):
if mode == "send":
end_of_file_name = os.path.basename(local)
if _check_dir(local, f"{remote}/{end_of_file_name}"):
if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
res *= 1
else:
res *= 0
elif mode == "recv":
end_of_file_name = os.path.basename(remote)
local = os.path.join(local, end_of_file_name)
if _check_dir(f"{local}", f"{remote}"):
if check_dir(f"{local}", f"{remote}", is_single_dir=True):
res *= 1
else:
res *= 0
@ -875,7 +917,10 @@ def check_cmd_time(cmd, pattern, duration, times):
res = []
for i in range(times):
start_in = time.time() * 1000
check_shell(cmd, pattern, fetch=fetchable)
if pattern is None:
subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
else:
check_shell(cmd, pattern, fetch=fetchable)
start_out = time.time() * 1000
res.append(start_out - start_in)

View File

@ -123,13 +123,15 @@ def test_file_error():
assert check_hdc_cmd(f"file send {get_local_path('small')} {get_remote_path('it_small_true')}")
@pytest.mark.repeat(1)
@pytest.mark.repeat(5)
def test_recv_dir():
if os.path.exists(get_local_path('it_problem_dir')):
rmdir(get_local_path('it_problem_dir'))
assert check_hdc_cmd(f"shell rm -rf {get_remote_path('it_problem_dir')}")
assert check_hdc_cmd(f"shell rm -rf {get_remote_path('problem_dir')}")
assert make_multiprocess_file(get_local_path('problem_dir'), get_remote_path(''), 'send', 1, "dir")
assert check_hdc_cmd(f"shell mv {get_remote_path('problem_dir')} {get_remote_path('it_problem_dir')}")
assert make_multiprocess_file(get_local_path(''), get_remote_path('it_problem_dir'), 'recv', 1, "dir")
if os.path.exists(get_local_path('it_problem_dir')):
rmdir(get_local_path('it_problem_dir'))
@pytest.mark.repeat(5)
@ -221,11 +223,7 @@ def test_target_cmd():
assert check_hdc_targets()
time.sleep(3)
check_hdc_cmd("target boot")
start_time = time.time()
run_command_with_timeout("hdc wait", 60) # reboot takes up to 60 seconds
end_time = time.time()
print(f"command exec time {end_time - start_time}")
assert (end_time - start_time) > 5 # Reboot takes at least 5 seconds
time.sleep(60) # reboot needs at least 60 seconds
assert (check_hdc_cmd("target mount", "Mount finish") or
check_hdc_cmd("target mount", "[Fail]Operate need running as root") or
check_hdc_cmd("target mount", "Remount successful.")
@ -309,6 +307,15 @@ def test_shell_cmd_timecost():
times=10)
def test_shell_huge_cat():
assert check_hdc_cmd(f"file send {get_local_path('word_100M.txt')} {get_remote_path('it_word_100M.txt')}")
assert check_cmd_time(
cmd=f"shell cat {get_remote_path('it_word_100M.txt')}",
pattern=None,
duration=10000, # 10 seconds
times=10)
def test_hdcd_rom():
baseline = 2200 # 2200KB
assert check_rom(baseline)
@ -348,10 +355,8 @@ def run_main():
GP.init()
if not os.path.exists(GP.local_path):
prepare_source()
else:
update_source()
prepare_source()
update_source()
choice_default = ""
parser = argparse.ArgumentParser()