from copy import copy, deepcopy
import inspect
import logging
from typing import (
Any,
Callable,
Dict,
Optional,
Tuple,
Union,
)
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.serve.context import get_global_client
from ray.dag.dag_node import DAGNodeBase
from ray.dag.class_node import ClassNode
from ray.dag.function_node import FunctionNode
from ray.serve.config import (
AutoscalingConfig,
DeploymentConfig,
)
from ray.serve._private.constants import SERVE_LOGGER_NAME, MIGRATION_MESSAGE
from ray.serve.handle import RayServeHandle, RayServeSyncHandle
from ray.serve._private.utils import DEFAULT, Default, guarded_deprecation_warning
from ray.util.annotations import Deprecated, PublicAPI
from ray.serve.schema import (
RayActorOptionsSchema,
DeploymentSchema,
)
logger = logging.getLogger(SERVE_LOGGER_NAME)
[docs]@PublicAPI(stability="beta")
class Application(DAGNodeBase):
"""One or more deployments bound with arguments that can be deployed together.
Can be passed into another `Deployment.bind()` to compose multiple deployments in a
single application, passed to `serve.run`, or deployed via a Serve config file.
For example, to define an Application and run it in Python:
.. code-block:: python
from ray import serve
from ray.serve import Application
@serve.deployment
class MyDeployment:
pass
app: Application = MyDeployment.bind(OtherDeployment.bind())
serve.run(app)
To run the same app using the command line interface (CLI):
.. code-block:: bash
serve run python_file:app
To deploy the same app via a config file:
.. code-block:: yaml
applications:
my_app:
import_path: python_file:app
"""
def __init__(
self, *, _internal_dag_node: Optional[Union[ClassNode, FunctionNode]] = None
):
if _internal_dag_node is None:
raise RuntimeError("This class should not be constructed directly.")
self._internal_dag_node = _internal_dag_node
def _get_internal_dag_node(self) -> Union[ClassNode, FunctionNode]:
if self._internal_dag_node is None:
raise RuntimeError("Application object should not be constructed directly.")
return self._internal_dag_node
@classmethod
def _from_internal_dag_node(cls, dag_node: Union[ClassNode, FunctionNode]):
return cls(_internal_dag_node=dag_node)
# Proxy all method calls to the underlying DAG node. This allows this class to be
# passed in place of the ClassNode or FunctionNode in the DAG building code.
def __getattr__(self, name: str) -> Any:
return getattr(self._get_internal_dag_node(), name)
[docs]@PublicAPI
class Deployment:
"""Class (or function) decorated with the `@serve.deployment` decorator.
This is run on a number of replica actors. Requests to those replicas call
this class.
One or more deployments can be composed together into an `Application` which is
then run via `serve.run` or a config file.
Example:
.. code-block:: python
@serve.deployment
class MyDeployment:
def __init__(self, name: str):
self._name = name
def __call__(self, request):
return "Hello world!"
app = MyDeployment.bind()
# Run via `serve.run` or the `serve run` CLI command.
serve.run(app)
"""
def __init__(
self,
func_or_class: Union[Callable, str],
name: str,
config: DeploymentConfig,
version: Optional[str] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Tuple[Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
ray_actor_options: Optional[Dict] = None,
is_driver_deployment: Optional[bool] = False,
_internal=False,
) -> None:
if not _internal:
raise RuntimeError(
"The Deployment constructor should not be called "
"directly. Use `@serve.deployment` instead."
)
if not callable(func_or_class) and not isinstance(func_or_class, str):
raise TypeError("@serve.deployment must be called on a class or function.")
if not isinstance(name, str):
raise TypeError("name must be a string.")
if not (version is None or isinstance(version, str)):
raise TypeError("version must be a string.")
if not (init_args is None or isinstance(init_args, (tuple, list))):
raise TypeError("init_args must be a tuple.")
if not (init_kwargs is None or isinstance(init_kwargs, dict)):
raise TypeError("init_kwargs must be a dict.")
if route_prefix is not DEFAULT.VALUE and route_prefix is not None:
if not isinstance(route_prefix, str):
raise TypeError("route_prefix must be a string.")
if not route_prefix.startswith("/"):
raise ValueError("route_prefix must start with '/'.")
if route_prefix != "/" and route_prefix.endswith("/"):
raise ValueError(
"route_prefix must not end with '/' unless it's the root."
)
if "{" in route_prefix or "}" in route_prefix:
raise ValueError("route_prefix may not contain wildcards.")
if not (ray_actor_options is None or isinstance(ray_actor_options, dict)):
raise TypeError("ray_actor_options must be a dict.")
if is_driver_deployment is True:
if config.num_replicas != 1:
raise ValueError("num_replicas should not be set for driver deployment")
if config.autoscaling_config:
raise ValueError("autoscaling should not be set for driver deployment")
if init_args is None:
init_args = ()
if init_kwargs is None:
init_kwargs = {}
docs_path = None
if (
inspect.isclass(func_or_class)
and hasattr(func_or_class, "__module__")
and func_or_class.__module__ == "ray.serve.api"
and hasattr(func_or_class, "__fastapi_docs_path__")
):
docs_path = func_or_class.__fastapi_docs_path__
self._func_or_class = func_or_class
self._name = name
self._version = version
self._config = config
self._init_args = init_args
self._init_kwargs = init_kwargs
self._route_prefix = route_prefix
self._ray_actor_options = ray_actor_options
self._is_driver_deployment = is_driver_deployment
self._docs_path = docs_path
@property
def name(self) -> str:
"""Unique name of this deployment."""
return self._name
@property
def version(self) -> Optional[str]:
return self._version
@property
def func_or_class(self) -> Union[Callable, str]:
"""Underlying class or function that this deployment wraps."""
return self._func_or_class
@property
def num_replicas(self) -> int:
"""Current target number of replicas."""
return self._config.num_replicas
@property
def user_config(self) -> Any:
"""Current dynamic user-provided config options."""
return self._config.user_config
@property
def max_concurrent_queries(self) -> int:
"""Current max outstanding queries from each handle."""
return self._config.max_concurrent_queries
@property
def route_prefix(self) -> Optional[str]:
"""HTTP route prefix that this deployment is exposed under."""
if self._route_prefix is DEFAULT.VALUE:
return f"/{self._name}"
return self._route_prefix
@property
def ray_actor_options(self) -> Optional[Dict]:
"""Actor options such as resources required for each replica."""
return self._ray_actor_options
@property
def init_args(self) -> Tuple[Any]:
return self._init_args
@property
def init_kwargs(self) -> Tuple[Any]:
return self._init_kwargs
@property
def url(self) -> Optional[str]:
if self._route_prefix is None or self._is_driver_deployment:
# this deployment is not exposed over HTTP
return None
return get_global_client().root_url + self.route_prefix
def __call__(self):
raise RuntimeError(
"Deployments cannot be constructed directly. "
"Use `deployment.deploy() instead.`"
)
[docs] @PublicAPI(stability="beta")
def bind(self, *args, **kwargs) -> Application:
"""Bind the arguments to the deployment and return an Application.
The returned Application can be deployed using `serve.run` (or via
config file) or bound to another deployment for composition.
"""
copied_self = copy(self)
copied_self._func_or_class = "dummy.module"
schema_shell = deployment_to_schema(copied_self)
if inspect.isfunction(self._func_or_class):
dag_node = FunctionNode(
self._func_or_class,
args, # Used to bind and resolve DAG only, can take user input
kwargs, # Used to bind and resolve DAG only, can take user input
self._ray_actor_options or dict(),
other_args_to_resolve={
"deployment_schema": schema_shell,
"is_from_serve_deployment": True,
},
)
else:
dag_node = ClassNode(
self._func_or_class,
args,
kwargs,
cls_options=self._ray_actor_options or dict(),
other_args_to_resolve={
"deployment_schema": schema_shell,
"is_from_serve_deployment": True,
},
)
return Application._from_internal_dag_node(dag_node)
[docs] @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE)
@Deprecated(message=MIGRATION_MESSAGE)
def deploy(self, *init_args, _blocking=True, **init_kwargs):
"""Deploy or update this deployment.
Args:
init_args: args to pass to the class __init__
method. Not valid if this deployment wraps a function.
init_kwargs: kwargs to pass to the class __init__
method. Not valid if this deployment wraps a function.
"""
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
self._deploy(*init_args, _blocking=_blocking, **init_kwargs)
# TODO(Sihan) Promote the _deploy to deploy after we fully deprecate the API
def _deploy(self, *init_args, _blocking=True, **init_kwargs):
"""Deploy or update this deployment.
Args:
init_args: args to pass to the class __init__
method. Not valid if this deployment wraps a function.
init_kwargs: kwargs to pass to the class __init__
method. Not valid if this deployment wraps a function.
"""
if len(init_args) == 0 and self._init_args is not None:
init_args = self._init_args
if len(init_kwargs) == 0 and self._init_kwargs is not None:
init_kwargs = self._init_kwargs
return get_global_client().deploy(
self._name,
self._func_or_class,
init_args,
init_kwargs,
ray_actor_options=self._ray_actor_options,
config=self._config,
version=self._version,
route_prefix=self.route_prefix,
url=self.url,
_blocking=_blocking,
)
[docs] @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE)
@Deprecated(message=MIGRATION_MESSAGE)
def delete(self):
"""Delete this deployment."""
return self._delete()
# TODO(Sihan) Promote the _delete to delete after we fully deprecate the API
def _delete(self):
"""Delete this deployment."""
return get_global_client().delete_deployments([self._name])
[docs] @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE)
@Deprecated(message=MIGRATION_MESSAGE)
def get_handle(
self, sync: Optional[bool] = True
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Get a ServeHandle to this deployment to invoke it from Python.
Args:
sync: If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return an
asyncio-optimized ServeHandle that's only usable in an asyncio
loop.
Returns:
ServeHandle
"""
return self._get_handle(sync)
# TODO(Sihan) Promote the _get_handle to get_handle after we fully deprecate the API
def _get_handle(
self,
sync: Optional[bool] = True,
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Get a ServeHandle to this deployment to invoke it from Python.
Args:
sync: If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return an
asyncio-optimized ServeHandle that's only usable in an asyncio
loop.
Returns:
ServeHandle
"""
return get_global_client().get_handle(
self._name,
missing_ok=True,
sync=sync,
)
[docs] @PublicAPI
def options(
self,
func_or_class: Optional[Callable] = None,
name: Default[str] = DEFAULT.VALUE,
version: Default[str] = DEFAULT.VALUE,
num_replicas: Default[Optional[int]] = DEFAULT.VALUE,
init_args: Default[Tuple[Any]] = DEFAULT.VALUE,
init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE,
route_prefix: Default[Union[str, None]] = DEFAULT.VALUE,
ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE,
user_config: Default[Optional[Any]] = DEFAULT.VALUE,
max_concurrent_queries: Default[int] = DEFAULT.VALUE,
autoscaling_config: Default[
Union[Dict, AutoscalingConfig, None]
] = DEFAULT.VALUE,
graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE,
graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE,
health_check_period_s: Default[float] = DEFAULT.VALUE,
health_check_timeout_s: Default[float] = DEFAULT.VALUE,
is_driver_deployment: bool = DEFAULT.VALUE,
_internal: bool = False,
) -> "Deployment":
"""Return a copy of this deployment with updated options.
Only those options passed in will be updated, all others will remain
unchanged from the existing deployment.
Refer to the `@serve.deployment` decorator docs for available arguments.
"""
# NOTE: The user_configured_option_names should be the first thing that's
# defined in this method. It depends on the locals() dictionary storing
# only the function args/kwargs.
# Create list of all user-configured options from keyword args
user_configured_option_names = [
option
for option, value in locals().items()
if option not in {"self", "func_or_class", "_internal"}
and value is not DEFAULT.VALUE
]
new_config = deepcopy(self._config)
if not _internal:
new_config.user_configured_option_names.update(user_configured_option_names)
if num_replicas not in [DEFAULT.VALUE, None] and autoscaling_config not in [
DEFAULT.VALUE,
None,
]:
raise ValueError(
"Manually setting num_replicas is not allowed when "
"autoscaling_config is provided."
)
if num_replicas == 0:
raise ValueError("num_replicas is expected to larger than 0")
if not _internal and version is not DEFAULT.VALUE:
logger.warning(
"DeprecationWarning: `version` in `Deployment.options()` has been "
"deprecated. Explicitly specifying version will raise an error in the "
"future!"
)
if num_replicas not in [DEFAULT.VALUE, None]:
new_config.num_replicas = num_replicas
if user_config is not DEFAULT.VALUE:
new_config.user_config = user_config
if max_concurrent_queries is not DEFAULT.VALUE:
new_config.max_concurrent_queries = max_concurrent_queries
if func_or_class is None:
func_or_class = self._func_or_class
if name is DEFAULT.VALUE:
name = self._name
if version is DEFAULT.VALUE:
version = self._version
if init_args is DEFAULT.VALUE:
init_args = self._init_args
if init_kwargs is DEFAULT.VALUE:
init_kwargs = self._init_kwargs
if route_prefix is DEFAULT.VALUE:
# Default is to keep the previous value
route_prefix = self._route_prefix
if ray_actor_options is DEFAULT.VALUE:
ray_actor_options = self._ray_actor_options
if autoscaling_config is not DEFAULT.VALUE:
new_config.autoscaling_config = autoscaling_config
if graceful_shutdown_wait_loop_s is not DEFAULT.VALUE:
new_config.graceful_shutdown_wait_loop_s = graceful_shutdown_wait_loop_s
if graceful_shutdown_timeout_s is not DEFAULT.VALUE:
new_config.graceful_shutdown_timeout_s = graceful_shutdown_timeout_s
if health_check_period_s is not DEFAULT.VALUE:
new_config.health_check_period_s = health_check_period_s
if health_check_timeout_s is not DEFAULT.VALUE:
new_config.health_check_timeout_s = health_check_timeout_s
if is_driver_deployment is DEFAULT.VALUE:
is_driver_deployment = self._is_driver_deployment
return Deployment(
func_or_class,
name,
new_config,
version=version,
init_args=init_args,
init_kwargs=init_kwargs,
route_prefix=route_prefix,
ray_actor_options=ray_actor_options,
_internal=True,
is_driver_deployment=is_driver_deployment,
)
[docs] @PublicAPI(stability="alpha")
def set_options(
self,
func_or_class: Optional[Callable] = None,
name: Default[str] = DEFAULT.VALUE,
version: Default[str] = DEFAULT.VALUE,
num_replicas: Default[Optional[int]] = DEFAULT.VALUE,
init_args: Default[Tuple[Any]] = DEFAULT.VALUE,
init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE,
route_prefix: Default[Union[str, None]] = DEFAULT.VALUE,
ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE,
user_config: Default[Optional[Any]] = DEFAULT.VALUE,
max_concurrent_queries: Default[int] = DEFAULT.VALUE,
autoscaling_config: Default[
Union[Dict, AutoscalingConfig, None]
] = DEFAULT.VALUE,
graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE,
graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE,
health_check_period_s: Default[float] = DEFAULT.VALUE,
health_check_timeout_s: Default[float] = DEFAULT.VALUE,
is_driver_deployment: bool = DEFAULT.VALUE,
_internal: bool = False,
) -> None:
"""Overwrite this deployment's options in-place.
Only those options passed in will be updated, all others will remain
unchanged.
Refer to the @serve.deployment decorator docstring for all non-private
arguments.
"""
validated = self.options(
func_or_class=func_or_class,
name=name,
version=version,
init_args=init_args,
init_kwargs=init_kwargs,
route_prefix=route_prefix,
num_replicas=num_replicas,
ray_actor_options=ray_actor_options,
user_config=user_config,
max_concurrent_queries=max_concurrent_queries,
autoscaling_config=autoscaling_config,
graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=graceful_shutdown_timeout_s,
health_check_period_s=health_check_period_s,
health_check_timeout_s=health_check_timeout_s,
_internal=_internal,
is_driver_deployment=is_driver_deployment,
)
self._func_or_class = validated._func_or_class
self._name = validated._name
self._version = validated._version
self._init_args = validated._init_args
self._init_kwargs = validated._init_kwargs
self._route_prefix = validated._route_prefix
self._ray_actor_options = validated._ray_actor_options
self._config = validated._config
def __eq__(self, other):
return all(
[
self._name == other._name,
self._version == other._version,
self._config == other._config,
self._init_args == other._init_args,
self._init_kwargs == other._init_kwargs,
# compare route prefix with default value resolved
self.route_prefix == other.route_prefix,
self._ray_actor_options == self._ray_actor_options,
]
)
def __str__(self):
return (
f"Deployment(name={self._name},"
f"version={self._version},"
f"route_prefix={self.route_prefix})"
)
def __repr__(self):
return str(self)
def deployment_to_schema(
d: Deployment, include_route_prefix: bool = True
) -> DeploymentSchema:
"""Converts a live deployment object to a corresponding structured schema.
Args:
d: Deployment object to convert
include_route_prefix: Whether to include the route_prefix in the returned
schema. This should be set to False if the schema will be included in a
higher-level object describing an application, and you want to place
route_prefix at the application level.
"""
if d.ray_actor_options is not None:
ray_actor_options_schema = RayActorOptionsSchema.parse_obj(d.ray_actor_options)
else:
ray_actor_options_schema = None
deployment_options = {
"name": d.name,
"num_replicas": None if d._config.autoscaling_config else d.num_replicas,
"max_concurrent_queries": d.max_concurrent_queries,
"user_config": d.user_config,
"autoscaling_config": d._config.autoscaling_config,
"graceful_shutdown_wait_loop_s": d._config.graceful_shutdown_wait_loop_s,
"graceful_shutdown_timeout_s": d._config.graceful_shutdown_timeout_s,
"health_check_period_s": d._config.health_check_period_s,
"health_check_timeout_s": d._config.health_check_timeout_s,
"ray_actor_options": ray_actor_options_schema,
"is_driver_deployment": d._is_driver_deployment,
}
if include_route_prefix:
deployment_options["route_prefix"] = d.route_prefix
# Let non-user-configured options be set to defaults. If the schema
# is converted back to a deployment, this lets Serve continue tracking
# which options were set by the user. Name is a required field in the
# schema, so it should be passed in explicitly.
for option in list(deployment_options.keys()):
if option != "name" and option not in d._config.user_configured_option_names:
del deployment_options[option]
# TODO(Sihan) DeploymentConfig num_replicas and auto_config can be set together
# because internally we use these two field for autoscale and deploy.
# We can improve the code after we separate the user faced deployment config and
# internal deployment config.
return DeploymentSchema(**deployment_options)
def schema_to_deployment(s: DeploymentSchema) -> Deployment:
"""Creates a deployment with parameters specified in schema.
The returned deployment CANNOT be deployed immediately. It's func_or_class
value is an empty string (""), which is not a valid import path. The
func_or_class value must be overwritten with a valid function or class
before the deployment can be deployed.
"""
if s.ray_actor_options is DEFAULT.VALUE:
ray_actor_options = None
else:
ray_actor_options = s.ray_actor_options.dict(exclude_unset=True)
if s.is_driver_deployment is DEFAULT.VALUE:
is_driver_deployment = False
else:
is_driver_deployment = s.is_driver_deployment
config = DeploymentConfig.from_default(
num_replicas=s.num_replicas,
user_config=s.user_config,
max_concurrent_queries=s.max_concurrent_queries,
autoscaling_config=s.autoscaling_config,
graceful_shutdown_wait_loop_s=s.graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=s.graceful_shutdown_timeout_s,
health_check_period_s=s.health_check_period_s,
health_check_timeout_s=s.health_check_timeout_s,
)
config.user_configured_option_names = s.get_user_configured_option_names()
return Deployment(
func_or_class="",
name=s.name,
config=config,
init_args=(),
init_kwargs={},
route_prefix=s.route_prefix,
ray_actor_options=ray_actor_options,
_internal=True,
is_driver_deployment=is_driver_deployment,
)