Deploy Compositions of Models#

This section helps you:

  • compose multiple deployments containing ML logic or business logic into a single application

  • independently scale and configure each of your ML models and business logic steps

Check out a new experimental API under development for connecting Ray Serve deployments together with the deployment graph API.

Composing Deployments using ServeHandles#

You can call deployment methods from within other deployments using the ServeHandle. This lets you divide your application’s steps (such as preprocessing, model inference, and post-processing) into independent deployments that can be independently scaled and configured.

To use the ServeHandle, use handle.remote to send requests to a deployment. These requests can be ordinary Python args and kwargs that are passed directly to the method. This method call returns a Ray ObjectRef whose result can be waited for or retrieved using await or ray.get.

Model Composition Example#

Here’s an example:

 1# File name: hello.py
 2import ray
 3from ray import serve
 4
 5
 6@serve.deployment
 7class LanguageClassifer:
 8    def __init__(self, spanish_responder, french_responder):
 9        self.spanish_responder = spanish_responder
10        self.french_responder = french_responder
11
12    async def __call__(self, http_request):
13        request = await http_request.json()
14        language, name = request["language"], request["name"]
15
16        if language == "spanish":
17            ref = await self.spanish_responder.say_hello.remote(name)
18        elif language == "french":
19            ref = await self.french_responder.say_hello.remote(name)
20        else:
21            return "Please try again."
22
23        return await ref
24
25
26@serve.deployment
27class SpanishResponder:
28    def say_hello(self, name: str):
29        return f"Hola {name}"
30
31
32@serve.deployment
33class FrenchResponder:
34    def say_hello(self, name: str):
35        return f"Bonjour {name}"
36
37
38spanish_responder = SpanishResponder.bind()
39french_responder = FrenchResponder.bind()
40language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)

In line 40, the LanguageClassifier deployment takes in the spanish_responder and french_responder as constructor arguments. At runtime, these arguments are converted into ServeHandles. LanguageClassifier can then call the spanish_responder and french_responder’s deployment methods using this handle.

For example, the LanguageClassifier’s __call__ method uses the HTTP request’s values to decide whether to respond in Spanish or French. It then forwards the request’s name to the spanish_responder or the french_responder on lines 17 and 19 using the ServeHandles. The calls are formatted as:

await self.spanish_responder.say_hello.remote(name)

This call has a few parts:

  • await lets us issue an asynchronous request through the ServeHandle.

  • self.spanish_responder is the SpanishResponder handle taken in through the constructor.

  • say_hello is the SpanishResponder method to invoke.

  • remote indicates that this is a ServeHandle call to another deployment. This is required when invoking a deployment’s method through another deployment. It needs to be added to the method name.

  • name is the argument for say_hello. You can pass any number of arguments or keyword arguments here.

This call returns a reference to the result– not the result itself. This pattern allows the call to execute asynchronously. To get the actual result, await the reference. await blocks until the asynchronous call executes, and then it returns the result. In this example, line 23 calls await ref and returns the resulting string. Note that getting the result needs two await statements in total. First, the script must await the ServeHandle call itself to retrieve a reference. Then it must await the reference to get the final result.

Warning

You can use the ray.get(ref) method to get the return value of remote ServeHandle calls. However, calling ray.get from inside a deployment is an antipattern. It blocks the deployment from executing any other code until the call is finished. Using await lets the deployment process other requests while waiting for the ServeHandle call to finish. You should use await instead of ray.get inside deployments.

You can copy the hello.py script above and run it with serve run. Make sure to run the command from a directory containing hello.py, so it can locate the script:

$ serve run hello:language_classifier

You can use this client script to interact with the example:

# File name: hello_client.py
import requests

response = requests.post(
    "http://localhost:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)

While the serve run command is running, open a separate terminal window and run this script:

$ python hello_client.py

Hola Dora

Note

Composition lets you break apart your application and independently scale each part. For instance, suppose this LanguageClassifier application’s requests were 75% Spanish and 25% French. You could scale your SpanishResponder to have 3 replicas and your FrenchResponder to have 1 replica, so you could meet your workload’s demand. This flexibility also applies to reserving resources like CPUs and GPUs, as well as any other configurations you can set for each deployment.

With composition, you can avoid application-level bottlenecks when serving models and business logic steps that use different types and amounts of resources.

ServeHandle Deep Dive#

Conceptually, a ServeHandle is a client-side load balancer, routing requests to any replicas of a given deployment. Also, it performs buffering internally so it won’t overwhelm the replicas. Using the current number of requests buffered, it informs the autoscaler to scale up the number of replicas.

architecture-diagram-of-serve-handle

ServeHandles take request parameters and returns a future object of type ray.ObjectRef, whose value will be filled with the result object. Because of the internal buffering, the time from submitting a request to getting a ray.ObjectRef can vary.

Because of this variability, Serve offers two types of handles to ensure the buffering period is handled efficiently. We offer synchronous and asynchronous versions of the handle:

  • RayServeSyncHandle directly returns a ray.ObjectRef. It blocks the current thread until the request is matched to a replica.

  • RayServeHandle returns an asyncio.Task upon submission. The asyncio.Task can be awaited to resolve to a ray.ObjectRef. While the current request is buffered, other requests can be processed concurrently.

serve.run deploys a deployment graph and returns the entrypoint node’s handle (the node you passed as argument to serve.run). The return type is a RayServeSyncHandle. This is useful for interacting with and testing the newly created deployment graph.

from starlette.requests import Request

import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self) -> str:
        return "hello"


handle: RayServeSyncHandle = serve.run(Model.bind())
ref: ray.ObjectRef = handle.remote()  # blocks until request is assigned to replica
assert ray.get(ref) == "hello"

In all other cases, RayServeHandle is the default because the API is more performant than its blocking counterpart. For example, when implementing a dynamic dispatch node in deployment graph, the handle is asynchronous.

import asyncio
import random
import ray
from ray import serve
from ray.serve.handle import RayServeHandle, RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self) -> str:
        return "hello"


@serve.deployment
class DynamicDispatcher:
    def __init__(self, handle_a: RayServeHandle, handle_b: RayServeHandle):
        self.handle_a = handle_a
        self.handle_b = handle_b

    async def __call__(self):
        handle_chosen = self.handle_a if random.random() < 0.5 else self.handle_b

        # The request is enqueued.
        submission_task: asyncio.Task = handle_chosen.remote()
        # The request is assigned to a replica.
        ref: ray.ObjectRef = await submission_task
        # The request has been processed by the replica.
        result = await ref

        return result


handle: RayServeSyncHandle = serve.run(
    DynamicDispatcher.bind(Model.bind(), Model.bind())
)
ref: ray.ObjectRef = handle.remote()
assert ray.get(ref) == "hello"

The result of handle.remote() can also be passed directly as an argument to other downstream handles, without having to await on it.

import asyncio
import ray
from ray import serve
from ray.serve.handle import RayServeHandle, RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self, inp):
        return "hello " + inp


@serve.deployment
class Chain:
    def __init__(self, handle_a: RayServeHandle, handle_b: RayServeHandle):
        self.handle_a = handle_a
        self.handle_b = handle_b

    async def __call__(self, inp):
        ref: asyncio.Task = await self.handle_b.remote(
            # Serve can handle enqueued-task as dependencies.
            self.handle_a.remote(inp)
        )
        return await ref


handle: RayServeSyncHandle = serve.run(Chain.bind(Model.bind(), Model.bind()))
ref: ray.ObjectRef = handle.remote("Serve")
assert ray.get(ref) == "hello hello Serve"

In both types of handles, you can call a specific method by using the .method_name accessor. For example:

import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle


@serve.deployment
class Deployment:
    def method1(self, arg: str) -> str:
        return f"Method1: {arg}"

    def __call__(self, arg: str) -> str:
        return f"__call__: {arg}"


handle: RayServeSyncHandle = serve.run(Deployment.bind())

ray.get(handle.remote("hi"))  # Defaults to calling the __call__ method.
ray.get(handle.method1.remote("hi"))  # Call a different method.

Note

ray.ObjectRef corresponds to the result of a request submission. To retrieve the result, you can use the synchronous Ray Core API ray.get(ref) or the async API await ref. To wait for the result to be available without retrieving it, you can use the synchronous API ray.wait([ref]) or the async API await asyncio.wait([ref]). You can mix and match these calls, but we recommend using async APIs to increase concurrency.