gecko-dev/testing/webcompat/client.py

355 lines
12 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import asyncio
import contextlib
import time
from urllib.parse import quote
import webdriver
class Client:
def __init__(self, session, event_loop):
self.session = session
self.event_loop = event_loop
self.content_blocker_loaded = False
@property
def current_url(self):
return self.session.url
@property
def alert(self):
return self.session.alert
@property
def context(self):
return self.session.send_session_command("GET", "moz/context")
@context.setter
def context(self, context):
self.session.send_session_command("POST", "moz/context", {"context": context})
@contextlib.contextmanager
def using_context(self, context):
orig_context = self.context
needs_change = context != orig_context
if needs_change:
self.context = context
try:
yield
finally:
if needs_change:
self.context = orig_context
def wait_for_content_blocker(self):
if not self.content_blocker_loaded:
with self.using_context("chrome"):
self.session.execute_async_script(
"""
const done = arguments[0],
signal = "safebrowsing-update-finished";
function finish() {
Services.obs.removeObserver(finish, signal);
done();
}
Services.obs.addObserver(finish, signal);
"""
)
self.content_blocker_loaded = True
@property
def keyboard(self):
return self.session.actions.sequence("key", "keyboard_id")
@property
def mouse(self):
return self.session.actions.sequence(
"pointer", "pointer_id", {"pointerType": "mouse"}
)
@property
def pen(self):
return self.session.actions.sequence(
"pointer", "pointer_id", {"pointerType": "touch"}
)
@property
def touch(self):
return self.session.actions.sequence(
"pointer", "pointer_id", {"pointerType": "pen"}
)
@property
def wheel(self):
return self.session.actions.sequence("wheel", "wheel_id")
@property
def modifier_key(self):
if self.session.capabilities["platformName"] == "mac":
return "\ue03d" # meta (command)
else:
return "\ue009" # control
def inline(self, doc):
return "data:text/html;charset=utf-8,{}".format(quote(doc))
async def navigate(self, url, timeout=None, await_console_message=None):
if timeout is not None:
old_timeout = self.session.timeouts.page_load
self.session.timeouts.page_load = timeout
if self.session.test_config.get("use_pbm") or self.session.test_config.get(
"use_strict_etp"
):
print("waiting for content blocker...")
self.wait_for_content_blocker()
if await_console_message is not None:
console_message = self.promise_console_message(await_console_message)
await self.session.bidi_session.session.subscribe(events=["log.entryAdded"])
try:
self.session.url = url
except webdriver.error.TimeoutException as e:
if timeout is None:
raise e
if await_console_message is not None:
await console_message
await self.session.bidi_session.session.unsubscribe(
events=["log.entryAdded"]
)
if timeout is not None:
self.session.timeouts.page_load = old_timeout
def back(self):
self.session.back()
def switch_to_frame(self, frame):
return self.session.transport.send(
"POST",
"session/{session_id}/frame".format(**vars(self.session)),
{"id": frame},
encoder=webdriver.protocol.Encoder,
decoder=webdriver.protocol.Decoder,
session=self.session,
)
def switch_frame(self, frame):
self.session.switch_frame(frame)
async def load_page_and_wait_for_iframe(
self, url, finder, loads=1, timeout=None, **kwargs
):
while loads > 0:
await self.navigate(url, **kwargs)
frame = self.await_element(finder, timeout=timeout)
loads -= 1
self.switch_frame(frame)
return frame
def promise_bidi_event(self, event_name: str, check_fn=None, timeout=5):
future = self.event_loop.create_future()
async def on_event(method, data):
print("on_event", method, data)
if check_fn is not None and not check_fn(method, data):
return
remove_listener()
future.set_result(data)
remove_listener = self.session.bidi_session.add_event_listener(
event_name, on_event
)
return asyncio.wait_for(future, timeout=timeout)
def promise_console_message(self, msg):
def check_messages(method, data):
if "text" in data:
if msg in data["text"]:
return True
if "args" in data and len(data["args"]) and "value" in data["args"][0]:
if msg in data["args"][0]["value"]:
return True
return self.promise_bidi_event("log.entryAdded", check_messages)
def execute_script(self, script, *args):
return self.session.execute_script(script, args=args)
def execute_async_script(self, script, *args, **kwargs):
return self.session.execute_async_script(script, args, **kwargs)
def clear_all_cookies(self):
self.session.transport.send(
"DELETE", "session/%s/cookie" % self.session.session_id
)
def _do_is_displayed_check(self, ele, is_displayed):
if ele is None:
return None
if type(ele) in [list, tuple]:
return [x for x in ele if self._do_is_displayed_check(x, is_displayed)]
if is_displayed is False and ele and self.is_displayed(ele):
return None
if is_displayed is True and ele and not self.is_displayed(ele):
return None
return ele
def find_css(self, *args, all=False, is_displayed=None, **kwargs):
try:
ele = self.session.find.css(*args, all=all, **kwargs)
return self._do_is_displayed_check(ele, is_displayed)
except webdriver.error.NoSuchElementException:
return None
def find_xpath(self, xpath, all=False, is_displayed=None):
route = "elements" if all else "element"
body = {"using": "xpath", "value": xpath}
try:
ele = self.session.send_session_command("POST", route, body)
return self._do_is_displayed_check(ele, is_displayed)
except webdriver.error.NoSuchElementException:
return None
def find_text(self, text, is_displayed=None, **kwargs):
try:
ele = self.find_xpath(f"//*[contains(text(),'{text}')]", **kwargs)
return self._do_is_displayed_check(ele, is_displayed)
except webdriver.error.NoSuchElementException:
return None
def find_element(self, finder, is_displayed=None, **kwargs):
ele = finder.find(self, **kwargs)
return self._do_is_displayed_check(ele, is_displayed)
def await_css(self, selector, **kwargs):
return self.await_element(self.css(selector), **kwargs)
def await_xpath(self, selector, **kwargs):
return self.await_element(self.xpath(selector), **kwargs)
def await_text(self, selector, *args, **kwargs):
return self.await_element(self.text(selector), **kwargs)
def await_element(self, finder, **kwargs):
return self.await_first_element_of([finder], **kwargs)[0]
class css:
def __init__(self, selector):
self.selector = selector
def find(self, client, **kwargs):
return client.find_css(self.selector, **kwargs)
class xpath:
def __init__(self, selector):
self.selector = selector
def find(self, client, **kwargs):
return client.find_xpath(self.selector, **kwargs)
class text:
def __init__(self, selector):
self.selector = selector
def find(self, client, **kwargs):
return client.find_text(self.selector, **kwargs)
def await_first_element_of(self, finders, timeout=None, delay=0.25, **kwargs):
t0 = time.time()
if timeout is None:
timeout = 10
found = [None for finder in finders]
exc = None
while time.time() < t0 + timeout:
for i, finder in enumerate(finders):
try:
result = finder.find(self, **kwargs)
if result:
found[i] = result
return found
except webdriver.error.NoSuchElementException as e:
exc = e
time.sleep(delay)
raise exc if exc is not None else webdriver.error.NoSuchElementException
return found
async def dom_ready(self, timeout=None):
if timeout is None:
timeout = 20
async def wait():
return self.session.execute_async_script(
"""
const cb = arguments[0];
setInterval(() => {
if (document.readyState === "complete") {
cb();
}
}, 500);
"""
)
task = asyncio.create_task(wait())
return await asyncio.wait_for(task, timeout)
def is_float_cleared(self, elem1, elem2):
return self.session.execute_script(
"""return (function(a, b) {
// Ensure that a is placed under b (and not to its right)
return a?.offsetTop >= b?.offsetTop + b?.offsetHeight &&
a?.offsetLeft < b?.offsetLeft + b?.offsetWidth;
}(arguments[0], arguments[1]));""",
elem1,
elem2,
)
@contextlib.contextmanager
def assert_getUserMedia_called(self):
self.execute_script(
"""
navigator.mediaDevices.getUserMedia =
navigator.mozGetUserMedia =
navigator.getUserMedia =
() => { window.__gumCalled = true; };
"""
)
yield
assert self.execute_script("return window.__gumCalled === true;")
def await_element_hidden(self, finder, timeout=None, delay=0.25):
t0 = time.time()
if timeout is None:
timeout = 20
elem = finder.find(self)
while time.time() < t0 + timeout:
try:
if self.is_displayed(elem):
time.sleep(delay)
except webdriver.error.StaleElementReferenceException:
return
def is_displayed(self, element):
if element is None:
return False
return self.session.execute_script(
"""
const e = arguments[0],
s = window.getComputedStyle(e),
v = s.visibility === "visible",
o = Math.abs(parseFloat(s.opacity));
return e.getClientRects().length && v && (isNaN(o) || o === 1.0);
""",
args=[element],
)