import asyncio
import concurrent.futures
from dataclasses import dataclass
from functools import wraps
import inspect
import threading
from typing import Coroutine, Optional, Union
import ray
from ray._private.utils import get_or_create_event_loop
from ray import serve
from ray.serve._private.common import EndpointTag
from ray.serve._private.constants import (
RAY_SERVE_ENABLE_NEW_ROUTING,
)
from ray.serve._private.utils import (
get_random_letters,
DEFAULT,
)
from ray.serve._private.router import Router, RequestMetadata
from ray.util import metrics
from ray.util.annotations import Deprecated, PublicAPI
_global_async_loop = None
def _wrap_into_async_task(async_func):
"""Wrap an async function so it returns async task instead of coroutine
This makes the returned value awaitable more than once.
"""
assert inspect.iscoroutinefunction(async_func)
@wraps(async_func)
def wrapper(*args, **kwargs):
return asyncio.ensure_future(async_func(*args, **kwargs))
return wrapper
def _create_or_get_async_loop_in_thread():
global _global_async_loop
if _global_async_loop is None:
_global_async_loop = asyncio.new_event_loop()
thread = threading.Thread(
daemon=True,
target=_global_async_loop.run_forever,
)
thread.start()
return _global_async_loop
@PublicAPI(stability="beta")
@dataclass(frozen=True)
class HandleOptions:
"""Options for each ServeHandle instance.
These fields can be changed by calling `.options()` on a handle.
"""
method_name: str = "__call__"
multiplexed_model_id: str = ""
stream: bool = False
def copy_and_update(
self,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
) -> "HandleOptions":
return HandleOptions(
method_name=(
self.method_name if method_name == DEFAULT.VALUE else method_name
),
multiplexed_model_id=(
self.multiplexed_model_id
if multiplexed_model_id == DEFAULT.VALUE
else multiplexed_model_id
),
stream=self.stream if stream == DEFAULT.VALUE else stream,
)
[docs]@PublicAPI(stability="beta")
class RayServeHandle:
"""A handle used to make requests from one deployment to another.
This is used to compose multiple deployments into a single application. After
building the application, this handle is substituted at runtime for deployments
passed as arguments via `.bind()`.
Example:
.. code-block:: python
import ray
from ray import serve
from ray.serve.handle import RayServeHandle, RayServeSyncHandle
@serve.deployment
class Downstream:
def __init__(self, message: str):
self._message = message
def __call__(self, name: str) -> str:
return self._message + name
@serve.deployment
class Ingress:
def __init__(self, handle: RayServeHandle):
self._handle = handle
async def __call__(self, name: str) -> str:
obj_ref: ray.ObjectRef = await self._handle.remote(name)
return await obj_ref
app = Ingress.bind(Downstream.bind("Hello "))
handle: RayServeSyncHandle = serve.run(app)
# Prints "Hello Mr. Magoo"
print(ray.get(handle.remote("Mr. Magoo")))
"""
def __init__(
self,
deployment_name: EndpointTag,
*,
handle_options: Optional[HandleOptions] = None,
_router: Optional[Router] = None,
_is_for_http_requests: bool = False,
):
self.deployment_name = deployment_name
self.handle_options = handle_options or HandleOptions()
self._is_for_http_requests = _is_for_http_requests
self.request_counter = metrics.Counter(
"serve_handle_request_counter",
description=(
"The number of handle.remote() calls that have been "
"made on this handle."
),
tag_keys=("handle", "deployment", "route", "application"),
)
handle_tag = f"{self.deployment_name}#{get_random_letters()}"
self.request_counter.set_default_tags(
{"handle": handle_tag, "deployment": self.deployment_name}
)
self._router: Optional[Router] = _router
def _get_or_create_router(self) -> Router:
if self._router is None:
self._router = Router(
serve.context.get_global_client()._controller,
self.deployment_name,
event_loop=get_or_create_event_loop(),
_use_new_routing=RAY_SERVE_ENABLE_NEW_ROUTING,
)
return self._router
@property
def _is_same_loop(self) -> bool:
"""Whether the caller's asyncio loop is the same loop for handle.
This is only useful for async handles.
"""
return get_or_create_event_loop() == self._get_or_create_router()._event_loop
def _options(
self,
*,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
):
new_handle_options = self.handle_options.copy_and_update(
method_name=method_name,
multiplexed_model_id=multiplexed_model_id,
stream=stream,
)
return self.__class__(
self.deployment_name,
handle_options=new_handle_options,
_router=self._router,
_is_for_http_requests=self._is_for_http_requests,
)
[docs] def options(
self,
*,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
) -> "RayServeHandle":
"""Set options for this handle and return an updated copy of it.
Example:
.. code-block:: python
# The following two lines are equivalent:
obj_ref = await handle.other_method.remote(*args)
obj_ref = await handle.options(method_name="other_method").remote(*args)
obj_ref = await handle.options(
multiplexed_model_id="model:v1").remote(*args)
"""
return self._options(
method_name=method_name,
multiplexed_model_id=multiplexed_model_id,
stream=stream,
)
def _remote(self, deployment_name, handle_options, args, kwargs) -> Coroutine:
_request_context = ray.serve.context._serve_request_context.get()
request_metadata = RequestMetadata(
_request_context.request_id,
deployment_name,
call_method=handle_options.method_name,
is_http_request=self._is_for_http_requests,
route=_request_context.route,
app_name=_request_context.app_name,
multiplexed_model_id=handle_options.multiplexed_model_id,
is_streaming=handle_options.stream,
)
self.request_counter.inc(
tags={
"route": _request_context.route,
"application": _request_context.app_name,
}
)
return self._get_or_create_router().assign_request(
request_metadata, *args, **kwargs
)
[docs] @_wrap_into_async_task
async def remote(self, *args, **kwargs) -> asyncio.Task:
"""Issue an asynchronous request to the __call__ method of the deployment.
Returns an `asyncio.Task` whose underlying result is a Ray ObjectRef that
points to the final result of the request.
The final result can be retrieved by awaiting the ObjectRef.
Example:
.. code-block:: python
obj_ref = await handle.remote(*args)
result = await obj_ref
"""
return await self._remote(
self.deployment_name, self.handle_options, args, kwargs
)
def __repr__(self):
return f"{self.__class__.__name__}" f"(deployment='{self.deployment_name}')"
@classmethod
def _deserialize(cls, kwargs):
"""Required for this class's __reduce__ method to be picklable."""
return cls(**kwargs)
def __reduce__(self):
serialized_data = {
"deployment_name": self.deployment_name,
"handle_options": self.handle_options,
"_is_for_http_requests": self._is_for_http_requests,
}
return RayServeHandle._deserialize, (serialized_data,)
def __getattr__(self, name):
return self.options(method_name=name)
def shutdown(self):
if self._router:
self._router.shutdown()
[docs]@PublicAPI(stability="beta")
class RayServeSyncHandle(RayServeHandle):
"""A handle used to make requests to the ingress deployment of an application.
This is returned by `serve.run` and can be used to invoke the application from
Python rather than over HTTP. For example:
.. code-block:: python
import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle
@serve.deployment
class Ingress:
def __call__(self, name: str) -> str:
return f"Hello {name}"
app = Ingress.bind()
handle: RayServeSyncHandle = serve.run(app)
# Prints "Hello Mr. Magoo"
print(ray.get(handle.remote("Mr. Magoo")))
"""
@property
def _is_same_loop(self) -> bool:
# NOTE(simon): For sync handle, the caller doesn't have to be in the
# same loop as the handle's loop, so we always return True here.
return True
def _get_or_create_router(self) -> Router:
if self._router is None:
self._router = Router(
serve.context.get_global_client()._controller,
self.deployment_name,
event_loop=_create_or_get_async_loop_in_thread(),
_use_new_routing=RAY_SERVE_ENABLE_NEW_ROUTING,
)
return self._router
[docs] def options(
self,
*,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
) -> "RayServeSyncHandle":
"""Set options for this handle and return an updated copy of it.
Example:
.. code-block:: python
# The following two lines are equivalent:
obj_ref = handle.other_method.remote(*args)
obj_ref = handle.options(method_name="other_method").remote(*args)
obj_ref = handle.options(multiplexed_model_id="model1").remote(*args)
"""
return self._options(
method_name=method_name,
multiplexed_model_id=multiplexed_model_id,
stream=stream,
)
[docs] def remote(self, *args, **kwargs) -> ray.ObjectRef:
"""Issue an asynchronous request to the __call__ method of the deployment.
Returns a Ray ObjectRef whose results can be waited for or retrieved
using ray.wait or ray.get, respectively.
.. code-block:: python
obj_ref = handle.remote(*args)
result = ray.get(obj_ref)
"""
coro = self._remote(self.deployment_name, self.handle_options, args, kwargs)
future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe(
coro, self._get_or_create_router()._event_loop
)
return future.result()
def __reduce__(self):
serialized_data = {
"deployment_name": self.deployment_name,
"handle_options": self.handle_options,
"_is_for_http_requests": self._is_for_http_requests,
}
return RayServeSyncHandle._deserialize, (serialized_data,)
@Deprecated(
message="RayServeDeploymentHandle is no longer used, use RayServeHandle instead."
)
class RayServeDeploymentHandle(RayServeHandle):
# We had some examples using this class for type hinting. To avoid breakig them,
# leave this as an alias.
pass