新增支持HDC动态监听设备状态

Signed-off-by: deveco_xdevice <liguangjie1@huawei.com>
This commit is contained in:
deveco_xdevice 2023-06-07 12:17:48 +08:00
parent 5f64a11765
commit b62a205e8d
6 changed files with 133 additions and 112 deletions

View File

@ -92,9 +92,13 @@ def perform_device_action(func):
ConnectionAbortedError) as error: # pylint:disable=undefined-variable
self.log.error("error type: %s, error: %s" %
(error.__class__.__name__, error))
cmd = "{} target boot".format(HdcHelper.CONNECTOR_NAME)
self.log.info("re-execute {} reset".format(HdcHelper.CONNECTOR_NAME))
exec_cmd(cmd)
# check hdc if running
if not HdcHelper.check_if_hdc_running():
LOG.debug("%s is not running, set device %s %s false" % (
HdcHelper.CONNECTOR_NAME, self.device_sn, ConfigConst.recover_state))
self.set_recover_state(False)
callback_to_outer(self, "recover failed")
raise error
callback_to_outer(self, "error:%s, prepare to recover" % error)
if not self.recover_device():
LOG.debug("Set device %s %s false" % (
@ -305,12 +309,9 @@ class Device(IDevice):
def _reboot_until_online(self):
self._do_reboot()
self._wait_for_device_online()
def reboot(self):
self._reboot_until_online()
self.device_state_monitor.wait_for_device_available(
self.reboot_timeout)
self.enable_hdc_root()
self.device_log_collector.restart_catch_device_log()
@ -591,7 +592,7 @@ class Device(IDevice):
@summary: Reconnect the device.
'''
if not self.is_harmony:
if not self.wait_for_boot_completion(waittime):
if not self.wait_for_boot_completion():
raise Exception("Reconnect timed out.")
if self._proxy:
@ -613,20 +614,13 @@ class Device(IDevice):
return self._proxy
return None
def wait_for_boot_completion(self, waittime=60 * 15, reconnect=False):
def wait_for_boot_completion(self):
"""Waits for the device to boot up.
Returns:
True if the device successfully finished booting, False otherwise.
"""
if not self.wait_for_device_not_available(
DEFAULT_UNAVAILABLE_TIMEOUT):
LOG.error("Did not detect device {} becoming unavailable "
"after reboot".format(convert_serial(self.device_sn)))
self._wait_for_device_online()
self.device_state_monitor.wait_for_device_available(
self.reboot_timeout)
return True
return self.device_state_monitor.wait_for_device_available(self.reboot_timeout)
def get_local_port(self):
from devicetest.utils.util import get_forward_port

View File

@ -96,6 +96,8 @@ class HdcMonitor:
self.monitoring = False
self.server = device_connector
self.devices = []
self.last_msg_len = 0
self.changed = True
@staticmethod
def get_instance(host, port=None, device_connector=None):
@ -112,8 +114,6 @@ class HdcMonitor:
Starts the monitoring.
"""
try:
connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
self.init_hdc(connector_name)
server_thread = threading.Thread(target=self.loop_monitor,
name="HdcMonitor", args=())
server_thread.setDaemon(True)
@ -137,6 +137,41 @@ class HdcMonitor:
"port", port))
time.sleep(1)
def _init_hdc_connection(self):
if self.main_hdc_connection is not None:
return
# set all devices disconnect
devices = [item for item in self.devices]
devices.reverse()
for local_device1 in devices:
local_device1.device_state = DeviceState.OFFLINE
self.server.device_changed(local_device1)
connector_name = HDC_STD_NAME if HdcHelper.is_hdc_std() else HDC_NAME
self.init_hdc(connector_name)
self.connection_attempt = 0
self.monitoring = False
while self.main_hdc_connection is None:
self.main_hdc_connection = self.open_hdc_connection()
if self.main_hdc_connection is None:
self.connection_attempt += 1
if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
self.is_stop = True
LOG.error(
"HdcMonitor attempt %s, can't connect to hdc "
"for Device List Monitoring" %
str(self.connection_attempt))
raise HdcError(
"HdcMonitor cannot connect hdc server(%s %s),"
" please check!" %
(self.channel.get("host"),
str(self.channel.get("post"))))
LOG.debug(
"HdcMonitor Connection attempts: %s" %
str(self.connection_attempt))
time.sleep(2)
def stop(self):
"""
Stops the monitoring.
@ -153,7 +188,7 @@ class HdcMonitor:
except (socket.error, socket.gaierror, socket.timeout) as _:
LOG.error("HdcMonitor close socket exception")
HdcMonitor.MONITOR_MAP.clear()
LOG.debug("HdcMonitor hdc monitor stop!")
LOG.debug("HdcMonitor {} monitor stop!".format(HdcHelper.CONNECTOR_NAME))
LOG.debug("HdcMonitor map is %s" % HdcMonitor.MONITOR_MAP)
def loop_monitor(self):
@ -164,60 +199,28 @@ class HdcMonitor:
while not self.is_stop:
try:
if self.main_hdc_connection is None:
self.main_hdc_connection = self.open_hdc_connection()
if self.main_hdc_connection is None:
self.connection_attempt += 1
if self.connection_attempt > MAX_CONNECT_ATTEMPT_COUNT:
self.is_stop = True
LOG.error(
"HdcMonitor attempt %s, can't connect to hdc "
"for Device List Monitoring" %
str(self.connection_attempt))
raise HdcError(
"HdcMonitor cannot connect hdc server(%s %s),"
" please check!" %
(self.channel.get("host"),
str(self.channel.get("post"))))
LOG.debug(
"HdcMonitor Connection attempts: %s" %
str(self.connection_attempt))
time.sleep(2)
else:
self._init_hdc_connection()
if self.main_hdc_connection is not None:
LOG.debug(
"HdcMonitor Connected to hdc for device "
"monitoring, main_hdc_connection is %s" %
self.main_hdc_connection)
self.list_targets()
time.sleep(2)
time.sleep(1)
except (HdcError, Exception) as _:
self.handle_exception_monitor_loop()
break
time.sleep(2)
def handle_exception_monitor_loop(self):
LOG.debug("Handle exception monitor loop: %s" %
self.main_hdc_connection)
if self.main_hdc_connection is None:
return
self.main_hdc_connection.close()
LOG.debug("Handle exception monitor loop, main hdc connection closed, "
"main hdc connection: %s" % self.main_hdc_connection)
def device_list_monitoring(self):
request = HdcHelper.form_hdc_request("host:track-devices")
HdcHelper.write(self.main_hdc_connection, request)
resp = HdcHelper.read_hdc_response(self.main_hdc_connection)
if not resp.okay:
LOG.error("HdcMonitor hdc rejected shell command")
raise Exception(resp.message)
else:
LOG.debug(
'HdcMonitor execute command success:send device_list '
'monitoring request')
return True
self.main_hdc_connection.close()
self.main_hdc_connection = None
def _get_device_instance(self, items, os_type):
device = get_plugin(plugin_type=Plugin.DEVICE, plugin_id=os_type)[0]
@ -225,10 +228,11 @@ class HdcMonitor:
device_instance.__set_serial__(items[0])
device_instance.host = self.channel.get("host")
device_instance.port = self.channel.get("port")
LOG.debug("Dmlib get device instance %s %s %s" %
(device_instance.device_sn,
device_instance.host, device_instance.port))
device_instance.device_state = DeviceState.get_state(items[1])
if self.changed:
LOG.debug("Dmlib get device instance %s %s %s" %
(device_instance.device_sn,
device_instance.host, device_instance.port))
device_instance.device_state = DeviceState.get_state(items[3])
return device_instance
def update_devices(self, param_array_list):
@ -245,8 +249,8 @@ class HdcMonitor:
local_device2.device_state:
local_device1.device_state = local_device2.device_state
self.server.device_changed(local_device1)
param_array_list.remove(local_device2)
break
param_array_list.remove(local_device2)
break
if k == 0:
self.devices.remove(local_device1)
@ -278,10 +282,9 @@ class HdcMonitor:
"host is %s, port is %s" % (str(exception),
self.channel.get("host"),
self.channel.get("port")))
return sock
return None
@classmethod
def start_hdc(cls, connector=HDC_NAME, kill=False, local_port=None):
def start_hdc(self, connector=HDC_NAME, kill=False, local_port=None):
"""Starts the hdc host side server.
Args:
@ -301,22 +304,35 @@ class HdcMonitor:
error_print=False, redirect=True)
def list_targets(self):
if self.main_hdc_connection and not self.monitoring:
self.server.monitor_lock.acquire()
self.monitoring_list_targets()
len_buf = HdcHelper.read(self.main_hdc_connection,
DATA_UNIT_LENGTH)
length = struct.unpack("!I", len_buf)[0]
LOG.debug("had received length is: %s" % length)
if length >= 8:
self.connection_attempt = 0
self.monitoring = True
self.process_incoming_target_data(length)
self.server.monitor_lock.release()
if self.main_hdc_connection:
self.server.monitor_lock.acquire(timeout=1)
try:
self.monitoring_list_targets()
len_buf = HdcHelper.read(self.main_hdc_connection,
DATA_UNIT_LENGTH)
length = struct.unpack("!I", len_buf)[0]
if length >= 0:
if self.last_msg_len != length:
LOG.debug("had received length is: %s" % length)
self.last_msg_len = length
self.changed = True
else:
self.changed = False
self.connection_attempt = 0
self.process_incoming_target_data(length)
except Exception as e:
LOG.error(e)
raise e
finally:
self.server.monitor_lock.release()
def monitoring_list_targets(self):
HdcHelper.handle_shake(self.main_hdc_connection)
request = HdcHelper.form_hdc_request('list targets')
if not self.monitoring:
HdcHelper.handle_shake(self.main_hdc_connection)
request = HdcHelper.form_hdc_request("alive")
HdcHelper.write(self.main_hdc_connection, request)
self.monitoring = True
request = HdcHelper.form_hdc_request('list targets -v')
HdcHelper.write(self.main_hdc_connection, request)
def process_incoming_target_data(self, length):
@ -327,14 +343,15 @@ class HdcMonitor:
lines = data_str.split('\n')
for line in lines:
items = line.strip().split('\t')
if not items[0]:
# Example: sn USB Offline localhost hdc
if not items[0] or len(items) < 5:
continue
items.append(DeviceState.ONLINE.value)
device_instance = self._get_device_instance(
items, DeviceOsType.default)
local_array_list.append(device_instance)
else:
LOG.debug("please check device actually.[%s]" % data_str.strip())
if self.changed:
LOG.debug("please check device actually.[%s]" % data_str.strip())
self.update_devices(local_array_list)
@staticmethod
@ -483,7 +500,7 @@ class SyncService:
length = self.swap32bit_from_array(pull_result, 0)
self.device.log.debug("do_pull_file: %s" % str(length))
else:
raise IndexError(str(index_error))
raise IndexError(str(index_error)) from index_error
if length > SYNC_DATA_MAX:
raise HdcError("Receiving too much data.")
@ -515,7 +532,7 @@ class SyncService:
file_path = os.path.join(local, child)
if os.path.isdir(file_path):
self.push_file(
file_path, "%s/%s" % (remote, child),
file_path, "%s/%s" % (remote, child),
is_create=False)
else:
self.do_push_file(file_path, "%s/%s" % (remote, child))
@ -723,20 +740,30 @@ class SyncService:
class HdcHelper:
CONNECTOR_NAME = ""
@staticmethod
def check_if_hdc_running(timeout=30):
LOG.debug("Check if {} is running, timeout is {}s".format(
HdcHelper.CONNECTOR_NAME, timeout))
index = 1
while index < timeout:
if is_proc_running(HdcHelper.CONNECTOR_NAME):
return True
index = index + 1
time.sleep(1)
return False
@staticmethod
def push_file(device, local, remote, is_create=False,
timeout=DEFAULT_TIMEOUT):
device.log.info("{} execute command: {} file send {} to {}".format(convert_serial(device.device_sn),
HdcHelper.CONNECTOR_NAME,
local, remote))
device.log.info("{} execute command: {} file send {} to {}".format(
convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, local, remote))
HdcHelper._operator_file("file send", device, local, remote, timeout)
@staticmethod
def pull_file(device, remote, local, is_create=False,
timeout=DEFAULT_TIMEOUT):
device.log.info("{} execute command: {} file recv {} to {}".format(convert_serial(device.device_sn),
HdcHelper.CONNECTOR_NAME,
remote, local))
device.log.info("{} execute command: {} file recv {} to {}".format(
convert_serial(device.device_sn), HdcHelper.CONNECTOR_NAME, remote, local))
HdcHelper._operator_file("file recv", device, remote, local, timeout)
@staticmethod
@ -805,6 +832,7 @@ class HdcHelper:
try:
if not timeout:
timeout = DEFAULT_TIMEOUT
with HdcHelper.socket(host=device.host, port=device.port,
timeout=timeout) as sock:
output_flag = kwargs.get("output_flag", True)
@ -838,10 +866,10 @@ class HdcHelper:
if not Scheduler.is_execute:
raise ExecuteTerminate()
return resp
except socket.timeout as _:
device.log.error("ShellCommandUnresponsiveException: %s shell %s timeout[%sS]" % (
except socket.timeout as error:
device.log.error("ShellCommandUnresponsiveException: {} shell {} timeout[{}S]".format(
convert_serial(device.device_sn), command, str(timeout / 1000)))
raise ShellCommandUnresponsiveException()
raise ShellCommandUnresponsiveException() from error
finally:
if receiver:
receiver.__done__()
@ -975,14 +1003,14 @@ class HdcHelper:
def socket(host=None, port=None, timeout=None):
end = time.time() + 10 * 60
sock = None
hdc_connection = HdcMonitor.MONITOR_MAP.get(host, "127.0.0.1")
while host not in HdcMonitor.MONITOR_MAP or \
HdcMonitor.MONITOR_MAP[host].main_hdc_connection is None:
hdc_connection.main_hdc_connection is None:
LOG.debug("Host: %s, port: %s, HdcMonitor map is %s" % (
host, port, HdcMonitor.MONITOR_MAP))
if host in HdcMonitor.MONITOR_MAP:
LOG.debug("Monitor main hdc connection is %s" %
HdcMonitor.MONITOR_MAP[host].main_hdc_connection)
hdc_connection.main_hdc_connection)
if time.time() > end:
raise HdcError("Cannot detect HDC monitor!")
time.sleep(2)
@ -1028,7 +1056,7 @@ class HdcHelper:
length = struct.unpack("!I", reply)[0]
data_buf = HdcHelper.read(sock, length)
HdcHelper.reply_to_string(data_buf)
LOG.debug("result is {}".format(data_buf))
LOG.debug("result %s" % data_buf)
@staticmethod
def is_hdc_std():
@ -1150,9 +1178,9 @@ def process_command_ret(ret, receiver):
if ret != "" and receiver:
receiver.__read__(ret)
receiver.__done__()
except Exception as _:
except Exception as error:
LOG.exception("Error generating log report.", exc_info=False)
raise ReportException()
raise ReportException() from error
if ret != "" and not receiver:
lines = ret.split("\n")

View File

@ -120,7 +120,7 @@ class ManagerDevice(IDeviceManager, IFilter):
LOG.debug("Find: release list con lock")
self.list_con.release()
def apply_device(self, device_option, timeout=10):
def apply_device(self, device_option, timeout=3):
LOG.debug("Apply device: apply lock con lock")
self.lock_con.acquire()
@ -128,12 +128,8 @@ class ManagerDevice(IDeviceManager, IFilter):
device = self.allocate_device_option(device_option)
if device:
return device
if hasattr(sys, ConfigConst.env_pool_cache):
wait_delta = 1
else:
wait_delta = 4
LOG.debug("Wait for available device founded")
self.wait_times += wait_delta
self.wait_times += 1
if self.wait_times > timeout:
self.lock_con.wait(timeout)
else:
@ -238,14 +234,14 @@ class ManagerDevice(IDeviceManager, IFilter):
(device_instance, device_instance.host,
device_instance.port, device_instance.device_sn,
device_instance.usb_type))
device_instance.device_state = DeviceState.get_state(
idevice.device_state)
device_instance.device_state = idevice.device_state
device_instance.test_device_state = \
TestDeviceState.get_test_device_state(
device_instance.device_state)
device_instance.device_state_monitor = \
DeviceStateMonitor(device_instance)
if idevice.device_state == DeviceState.ONLINE:
if idevice.device_state == DeviceState.ONLINE or \
idevice.device_state == DeviceState.CONNECTED:
device_instance.get_device_type()
self.append_device_by_sort(device_instance)
device = device_instance

View File

@ -34,6 +34,8 @@ class TestDeviceState(Enum):
return TestDeviceState.NOT_AVAILABLE
elif device_state == DeviceState.ONLINE:
return TestDeviceState.ONLINE
elif device_state == DeviceState.CONNECTED:
return TestDeviceState.ONLINE
elif device_state == DeviceState.OFFLINE:
return TestDeviceState.NOT_AVAILABLE
elif device_state == DeviceState.RECOVERY:
@ -49,12 +51,13 @@ class DeviceState(Enum):
BOOTLOADER = "bootloader"
OFFLINE = "offline"
ONLINE = "device"
CONNECTED = "connected"
RECOVERY = "recovery"
@staticmethod
def get_state(state):
for device_state in DeviceState:
if device_state.value == state:
if device_state.value == state.lower():
return device_state
return None

View File

@ -128,7 +128,7 @@ class EnvPool(object):
device.remove_ports()
self._unload_manager()
def _apply_device(self, selector, timeout=10):
def _apply_device(self, selector, timeout=3):
LOG.info("Apply device in pool")
for manager_type, manager in self._filters.items():
if not manager.__filter_selector__(selector):

View File

@ -156,7 +156,7 @@ class EnvironmentManager(object):
for _, device in used_devices.items():
self.reset_device(device)
def apply_device(self, device_option, timeout=10):
def apply_device(self, device_option, timeout=3):
LOG.debug("Apply device from managers:%s" % self.managers)
for manager_type, manager in self.managers.items():
support_labels = getattr(manager, "support_labels", [])