Set Up FastAPI and HTTP#

This section helps you understand how to:

  • send HTTP requests to Serve deployments

  • use Ray Serve to integrate with FastAPI

  • use customized HTTP adapters

  • choose which feature to use for your use case

Choosing the right HTTP feature#

Serve offers a layered approach to expose your model with the right HTTP API.

Considering your use case, you can choose the right level of abstraction:

Calling Deployments via HTTP#

When you deploy a Serve application, the ingress deployment (the one passed to serve.run) is exposed over HTTP.

import starlette.requests
import requests
from ray import serve


@serve.deployment
class Counter:
    def __call__(self, request: starlette.requests.Request):
        return request.query_params


serve.run(Counter.bind())
resp = requests.get("http://localhost:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}

Requests to the Serve HTTP server at / are routed to the deployment’s __call__ method with a Starlette Request object as the sole argument. The __call__ method can return any JSON-serializable object or a Starlette Response object (e.g., to return a custom status code or custom headers).

Often for ML models, you just need the API to accept a numpy array. You can use Serve’s DAGDriver to simplify the request parsing.

import numpy as np
import requests
from ray import serve
from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_to_ndarray


@serve.deployment
class Model:
    def __call__(self, arr: np.ndarray):
        return arr.sum()


serve.run(DAGDriver.bind(Model.bind(), http_adapter=json_to_ndarray))
resp = requests.post("http://localhost:8000", json={"array": [[1, 2], [2, 3]]})
assert resp.json() == 8

Note

Serve provides a library of HTTP adapters to help you avoid boilerplate code. The later section dives deeper into how these works.

FastAPI HTTP Deployments#

If you want to define more complex HTTP handling logic, Serve integrates with FastAPI. This allows you to define a Serve deployment using the @serve.ingress decorator that wraps a FastAPI app with its full range of features. The most basic example of this is shown below, but for more details on all that FastAPI has to offer such as variable routes, automatic type validation, dependency injection (e.g., for database connections), and more, please check out their documentation.

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment(route_prefix="/hello")
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"


serve.run(MyFastAPIDeployment.bind())
resp = requests.get("http://localhost:8000/hello")
assert resp.json() == "Hello, world!"

Now if you send a request to /hello, this will be routed to the root method of our deployment. We can also easily leverage FastAPI to define multiple routes with different HTTP methods:

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment(route_prefix="/hello")
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"

    @app.post("/{subpath}")
    def root(self, subpath: str):
        return f"Hello from {subpath}!"


serve.run(MyFastAPIDeployment.bind())
resp = requests.post("http://localhost:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"

You can also pass in an existing FastAPI app to a deployment to serve it as-is:

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@app.get("/")
def f():
    return "Hello from the root!"


@serve.deployment(route_prefix="/")
@serve.ingress(app)
class FastAPIWrapper:
    pass


serve.run(FastAPIWrapper.bind())
resp = requests.get("http://localhost:8000/")
assert resp.json() == "Hello from the root!"

This is useful for scaling out an existing FastAPI app with no modifications necessary. Existing middlewares, automatic OpenAPI documentation generation, and other advanced FastAPI features should work as-is.

WebSockets#

Serve supports WebSockets via FastAPI:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from ray import serve


app = FastAPI()


@serve.deployment
@serve.ingress(app)
class EchoServer:
    @app.websocket("/")
    async def echo(self, ws: WebSocket):
        await ws.accept()

        try:
            while True:
                text = await ws.receive_text()
                await ws.send_text(text)
        except WebSocketDisconnect:
            print("Client disconnected.")


serve_app = serve.run(EchoServer.bind())

Decorate the function that handles WebSocket requests with @app.websocket. Read more about FastAPI WebSockets in the FastAPI documentation.

Query the deployment using the websockets package (pip install websockets):

from websockets.sync.client import connect

with connect("ws://localhost:8000") as websocket:
    websocket.send("Eureka!")
    assert websocket.recv() == "Eureka!"

    websocket.send("I've found it!")
    assert websocket.recv() == "I've found it!"

Streaming Responses#

Some applications must stream incremental results back to the caller. This is common for text generation using large language models (LLMs) or video processing applications. The full forward pass may take multiple seconds, so providing incremental results as they’re available provides a much better user experience.

To use HTTP response streaming, return a StreamingResponse that wraps a generator from your HTTP handler. This is supported for basic HTTP ingress deployments using a __call__ method and when using the FastAPI integration.

The code below defines a Serve application that incrementally streams numbers up to a provided max. The client-side code is also updated to handle the streaming outputs. This code uses the stream=True option to the requests library.

import time
from typing import Generator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    def generate_numbers(self, max: int) -> Generator[str, None, None]:
        for i in range(max):
            yield str(i)
            time.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = request.query_params.get("max", "25")
        gen = self.generate_numbers(int(max))
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")

Save this code in stream.py and run it:

$ python stream.py
[2023-05-25 10:44:23]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(HTTPProxyActor pid=40403) INFO:     Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms

Handling client disconnects#

In some cases, you may want to cease processing a request when the client disconnects before the full stream has been returned. If you pass an async generator to StreamingResponse, it will be cancelled and raise an asyncio.CancelledError when the client disconnects. Note that you must await at some point in the generator for the cancellation to occur.

In the example below, the generator streams responses forever until the client disconnects, then it prints that it was cancelled and exits. Save this code in stream.py and run it:

import asyncio
import time
from typing import AsyncGenerator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_forever(self) -> AsyncGenerator[str, None]:
        try:
            i = 0
            while True:
                yield str(i)
                i += 1
                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            print("Cancelled! Exiting.")

    def __call__(self, request: Request) -> StreamingResponse:
        gen = self.generate_forever()
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
    if i == 10:
        print("Client disconnecting")
        break
$ python stream.py
[2023-07-10 16:08:41]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(HTTPProxyActor pid=50803) INFO:     Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms

HTTP Adapters#

HTTP adapters are functions that convert raw HTTP requests to basic Python types that you know and recognize.

For example, here is an adapter that extracts the JSON content from a request:

async def json_resolver(request: starlette.requests.Request):
    return await request.json()

The input arguments to an HTTP adapter should be type-annotated. At a minimum, the adapter should accept a starlette.requests.Request type (https://www.starlette.io/requests/#request), but it can also accept any type that’s recognized by FastAPI’s dependency injection framework.

Here is an HTTP adapter that accepts two HTTP query parameters:

def parse_query_args(field_a: int, field_b: str):
    return YourDataClass(field_a, field_b)

You can specify different type signatures to facilitate the extraction of HTTP fields, including

For more details, you can take a look at the FastAPI documentation.

In addition to above adapters, you also use other adapters. Below we examine at least two:

  • Serve Deployment Graph DAGDriver

  • Embedded in Bring Your Own FastAPI Application

Serve Deployment Graph DAGDriver#

When using a Serve deployment graph, you can configure ray.serve.drivers.DAGDriver to accept an HTTP adapter via its http_adapter field.

For example, the json_request adapter parses JSON in the HTTP body:

from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_request
from ray.dag.input_node import InputNode

with InputNode() as input_node:
    # ...
    dag = DAGDriver.bind(other_node, http_adapter=json_request)

Embedded in your existing FastAPI Application#

You can also bring the adapter to your own FastAPI app using Depends. The input schema automatically become part of the generated OpenAPI schema with FastAPI.

from fastapi import FastAPI, Depends
from ray.serve.http_adapters import json_to_ndarray

app = FastAPI()

@app.post("/endpoint")
async def endpoint(np_array = Depends(json_to_ndarray)):
    ...

Pydantic models as adapters#

Serve also supports pydantic models as a shorthand for HTTP adapters in model wrappers. Instead of using a function to define your HTTP adapter as in the examples above, you can directly pass in a pydantic model class to effectively tell Ray Serve to validate the HTTP body with this schema. Once validated, the model instance will passed to the predictor.

from pydantic import BaseModel

class User(BaseModel):
    user_id: int
    user_name: str

# ...

DAGDriver.bind(other_node, http_adapter=User)

List of built-in adapters#

Here is a list of adapters; please feel free to contribute more!

ray.serve.http_adapters.json_to_ndarray(payload: ray.serve.http_adapters.NdArray) numpy.ndarray[source]#

Accepts an NdArray JSON from an HTTP body and converts it to a numpy array.

pydantic model ray.serve.http_adapters.NdArray[source]#

Schema for numeric array input.

Show JSON schema
{
   "title": "NdArray",
   "description": "Schema for numeric array input.",
   "type": "object",
   "properties": {
      "array": {
         "title": "Array",
         "description": "The array content as a nested list. You can pass in 1D to 4D array as nested list, or flatten them. When you flatten the array, you can use the `shape` parameter to perform reshaping.",
         "anyOf": [
            {
               "type": "array",
               "items": {
                  "type": "number"
               }
            },
            {
               "type": "array",
               "items": {
                  "type": "array",
                  "items": {
                     "type": "number"
                  }
               }
            },
            {
               "type": "array",
               "items": {
                  "type": "array",
                  "items": {
                     "type": "array",
                     "items": {
                        "type": "number"
                     }
                  }
               }
            },
            {
               "type": "array",
               "items": {
                  "type": "array",
                  "items": {
                     "type": "array",
                     "items": {
                        "type": "array",
                        "items": {
                           "type": "number"
                        }
                     }
                  }
               }
            }
         ]
      },
      "shape": {
         "title": "Shape",
         "description": "The shape of the array. If present, the array will be reshaped.",
         "type": "array",
         "items": {
            "type": "integer"
         }
      },
      "dtype": {
         "title": "Dtype",
         "description": "The numpy dtype of the array. If present, the array will be cast by `astype`.",
         "type": "string"
      }
   },
   "required": [
      "array"
   ]
}

Fields
field array: Union[List[float], List[List[float]], List[List[List[float]]], List[List[List[List[float]]]]] [Required]#

The array content as a nested list. You can pass in 1D to 4D array as nested list, or flatten them. When you flatten the array, you can use the shape parameter to perform reshaping.

field dtype: Optional[str] = None#

The numpy dtype of the array. If present, the array will be cast by astype.

field shape: Optional[List[int]] = None#

The shape of the array. If present, the array will be reshaped.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.serve.http_adapters.json_to_multi_ndarray(payload: Dict[str, ray.serve.http_adapters.NdArray]) Dict[str, numpy.ndarray][source]#

Accepts a JSON of shape {str_key: NdArray} and converts it to dict of arrays.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.serve.http_adapters.starlette_request(request: starlette.requests.Request) starlette.requests.Request[source]#

Returns the raw request object.

PublicAPI (beta): This API is in beta and may change before becoming stable.

async ray.serve.http_adapters.json_request(request: starlette.requests.Request) Dict[str, Any][source]#

Return the JSON object from request body.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.serve.http_adapters.image_to_ndarray(img: bytes = File(Ellipsis)) numpy.ndarray[source]#

Accepts a PIL-readable file from an HTTP form and convert it to a numpy array.

PublicAPI (beta): This API is in beta and may change before becoming stable.

async ray.serve.http_adapters.pandas_read_json(raw_request: starlette.requests.Request)[source]#

Accept JSON body and converts into pandas DataFrame.

This function simply uses pandas.read_json(body, **query_params) under the hood.

PublicAPI (beta): This API is in beta and may change before becoming stable.