mirror of
https://github.com/xemu-project/xemu.git
synced 2025-02-11 23:27:43 +00:00
python/aqmp: fully separate from qmp.QEMUMonitorProtocol
After this patch, qemu.aqmp.legacy.QEMUMonitorProtocol no longer inherits from qemu.qmp.QEMUMonitorProtocol. To do this, several inherited methods need to be explicitly re-defined. (Licensing: This is copying and modifying GPLv2-only code into a GPLv2-only file.) Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> Reviewed-by: Beraldo Leal <bleal@redhat.com> Message-id: 20220330172812.3427355-4-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
parent
9fcd3930e0
commit
0c78ebf722
@ -16,18 +16,18 @@ This class pretends to be qemu.qmp.QEMUMonitorProtocol.
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
import qemu.qmp
|
||||
|
||||
from .error import QMPError
|
||||
from .protocol import Runstate, SocketAddrT
|
||||
from .qmp_client import QMPClient
|
||||
@ -59,12 +59,11 @@ class QMPBadPortError(QMPError):
|
||||
"""
|
||||
|
||||
|
||||
class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
|
||||
class QEMUMonitorProtocol:
|
||||
def __init__(self, address: SocketAddrT,
|
||||
server: bool = False,
|
||||
nickname: Optional[str] = None):
|
||||
|
||||
# pylint: disable=super-init-not-called
|
||||
self._aqmp = QMPClient(nickname)
|
||||
self._aloop = asyncio.get_event_loop()
|
||||
self._address = address
|
||||
@ -88,7 +87,18 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
|
||||
return self._aqmp.greeting._asdict()
|
||||
return None
|
||||
|
||||
# __enter__ and __exit__ need no changes
|
||||
def __enter__(self: _T) -> _T:
|
||||
# Implement context manager enter function.
|
||||
return self
|
||||
|
||||
def __exit__(self,
|
||||
# pylint: disable=duplicate-code
|
||||
# see https://github.com/PyCQA/pylint/issues/3619
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType]) -> None:
|
||||
# Implement context manager exit function.
|
||||
self.close()
|
||||
|
||||
@classmethod
|
||||
def parse_address(cls, address: str) -> SocketAddrT:
|
||||
@ -142,7 +152,22 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
|
||||
)
|
||||
)
|
||||
|
||||
# Default impl of cmd() delegates to cmd_obj
|
||||
def cmd(self, name: str,
|
||||
args: Optional[Dict[str, object]] = None,
|
||||
cmd_id: Optional[object] = None) -> QMPMessage:
|
||||
"""
|
||||
Build a QMP command and send it to the QMP Monitor.
|
||||
|
||||
@param name: command name (string)
|
||||
@param args: command arguments (dict)
|
||||
@param cmd_id: command id (dict, list, string or int)
|
||||
"""
|
||||
qmp_cmd: QMPMessage = {'execute': name}
|
||||
if args:
|
||||
qmp_cmd['arguments'] = args
|
||||
if cmd_id:
|
||||
qmp_cmd['id'] = cmd_id
|
||||
return self.cmd_obj(qmp_cmd)
|
||||
|
||||
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
|
||||
return self._sync(
|
||||
|
Loading…
x
Reference in New Issue
Block a user