Skip to content

ResNet50

Overview

In this tutorial you will deploy a precompiled ResNet50 model on Ray Serve with RBLN NPUs.

The workflow covers:

  1. Verifying the environment and compiled model.
  2. Defining a Ray Serve deployment that targets RBLN hardware.
  3. Launching the application with the Serve CLI.
  4. Sending an inference request to validate the endpoint.
  5. Extending the deployment with batching and bucketing.

If you need help configuring Ray Serve itself, review the Ray Serve overview first. For a complete script-based example (from compilation to deployment), see the model zoo reference.

Setup & Installation

Before you begin, ensure that your system environment is properly configured and that all required packages are installed. This includes:

  • System Requirements:
    • Ubuntu 20.04 LTS (Debian bullseye) or higher
    • System with RBLN NPUs equipped (e.g., RBLN ATOM™)
  • Packages Requirements:
  • Installation Command:
    pip install -U ray[serve] requests torch --extra-index-url https://download.pytorch.org/whl/cpu
    

Note

The following sections assume you already understand how to compile and execute models with the RBLN SDK. Revisit the PyTorch or TensorFlow tutorials and the Python API guide if you need a refresher.

Prerequisites

Prepare the Compiled Model

Note

Prepare the compiled model artifact ahead of time (for example, resnet50.rbln generated in the PyTorch ResNet50 tutorial). The steps below focus on serving that prebuilt binary with Ray Serve.

Deployment Flow

Deployment Overview

Step Description
1. Deployment implementation Configure Ray to use RBLN NPUs and define the Ray Serve deployment that loads the compiled model, initializes the runtime, and exposes an endpoint.
2. Execution Launch the deployment with Ray Serve CLI(serve run), optionally configuring application names, device sets, or remote Ray clusters.
3. Inference request Send an HTTP request to the Serve endpoint and inspect the response to validate the deployment.

The sections below walk through these steps in order.

1.1 Resource Allocation

Ray exposes custom accelerators through the resources argument, so each task or deployment can request exactly the hardware it needs.

The Actor below shows how to request an RBLN resource with @ray.remote(resources={"RBLN": 1}); increase the value whenever your deployment needs more cards. The companion RBLNActor helper retrieves the assigned device ID and passes it to the Serve deployment. See RBLN NPUs with Ray for additional background.

1
2
3
4
@ray.remote(resources={"RBLN": 1})
class RBLNActor:
    def getDeviceId(self):
        return ray.get_runtime_context().get_accelerator_ids()["RBLN"]

1.2 Deployment Definition

Ray Serve deployments are defined by annotating a class or function with @serve.deployment. This decorator registers the class as a Ray Serve service endpoint, allowing Ray Serve to manage the lifecycle (deployment, scaling, updates).

resnet50.py
# File name: resnet50.py
import io
import json
import os

import ray
import rebel
import torch
from PIL import Image
from ray import serve
from starlette.requests import Request
from torchvision.models import ResNet50_Weights

ray.init(resources={"RBLN": 1})


@ray.remote(resources={"RBLN": 1})
class RBLNActor:
    def getDeviceId(self):
        return ray.get_runtime_context().get_accelerator_ids()["RBLN"]


@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 4})
class Resnet50:
    def __init__(self, rbln_actor: RBLNActor):
        self.initialized = False
        self.weights = None
        self.rbln_actor = rbln_actor
        self.ids = ray.get(rbln_actor.getDeviceId.remote())
        self.rbln_devices()
        self.initialize()

    def initialize(self):
        """
        Initialize model. This will be called during model loading time
        :return:
        """
        model_path = "./resnet50.rbln"
        if not os.path.isfile(model_path):
            raise RuntimeError(
                f"[RBLN ERROR] File not found at the specified model_path({model_path})."
            )
        self.module = rebel.Runtime(
            model_path, tensor_type="pt", device=int(self.ids[0])
        )
        self.weights = ResNet50_Weights.DEFAULT
        self.initialized = True

    def rbln_devices(self):
        """
        Redefine the environment variables to be passed to the RBLN runtime
        :return:
        """
        if self.ids is None or len(self.ids) <= 0:
            os.environ.pop("RBLN_DEVICES")
        os.environ["RBLN_DEVICES"] = ",".join(self.ids)

    def preprocess(self, input_data):
        """
        Transform raw input into model input data.
        :param batch: list of raw requests, should match batch size
        :return: list of preprocessed model input data
        """
        assert input_data is not None, print(
            "[RBLN][ERROR] Data not found with client request."
        )
        if not isinstance(input_data, (bytes, bytearray)):
            raise ValueError("[RBLN][ERROR] Preprocessed data is not binary data.")

        try:
            image = Image.open(io.BytesIO(input_data))
        except Exception as e:
            raise ValueError(f"[RBLN][ERROR]Invalid image data: {e}") from e
        prep = self.weights.transforms()
        batch = prep(image).unsqueeze(0)
        preprocessed_data = batch.numpy()

        return torch.from_numpy(preprocessed_data)

    def inference(self, model_input):
        """
        Internal inference methods
        :param model_input: transformed model input data
        :return: list of inference output in NDArray
        """

        model_output = self.module.run(model_input)
        return model_output

    def postprocess(self, inference_output):
        """
        Return inference result.
        :param inference_output: list of inference output
        :return: list of predict results
        """
        score, class_id = torch.topk(inference_output, 1, dim=1)
        category_name = self.weights.meta["categories"][class_id]
        return category_name

    def handle(self, data):
        """
        Invoke by TorchServe for prediction request.
        Do pre-processing of data, prediction using model and postprocessing of prediciton output
        :param data: Input data for prediction
        :param context: Initial context contains model server system properties.
        :return: prediction output
        """
        model_input = self.preprocess(data)
        model_output = self.inference(model_input)
        category_name = self.postprocess(model_output)

        return json.dumps({"result": category_name})

    async def __call__(self, http_request: Request) -> str:
        image_byte = await http_request.body()
        return self.handle(image_byte)


rbln_actor = RBLNActor.remote()
app = Resnet50.bind(rbln_actor)

2. Execution

Use the Ray Serve CLI (serve run) to launch the application. The argument uses the module:application format, where module is the Python filename (without .py) and application is the exported Serve entry point.

In this sample, resnet50.py defines app, so the following command starts the deployment. Add extra options when connecting to a remote Ray cluster or when pinning RBLN_DEVICES to specific cards.

$ serve run resnet50:app --name "resnet50"

Example Output:

Application 'resnet50' is ready at http://127.0.0.1:8000/.

3. Inference Request Example

Download a sample image for the ResNet50 inference request, then issue an HTTP POST with curl to verify the endpoint.

1
2
3
4
5
# Download a sample image
$ wget https://rbln-public.s3.ap-northeast-2.amazonaws.com/images/tabby.jpg

# Send an inference request
$ curl -X POST http://localhost:8000/ --header "Content-Type: image/jpeg" --data-binary @../tabby.jpg | jq .

Example Output:

1
2
3
{
  "result": "tabby"
}

Advanced Features

Batch Inference

Ray Serve can accumulate multiple requests and process them together. Define the entry function as async def, accept a List of requests, and decorate the method with @serve.batch. Tune throughput and latency with the knobs below:

  • max_batch_size: Maximum number of requests grouped per inference call.
  • batch_wait_timeout: Maximum time to wait for additional requests before dispatching the current batch.

Model Compilation

The RBLN compiler supports a “bucketing” mode that compiles the same model for multiple input shapes. Bucketing lets a single deployment handle a range of batch sizes efficiently without recompiling. See the Bucketing Tutorial for a deeper dive.

bucketing_compile.py
import rebel
import torch
from torchvision.models import ResNet50_Weights, resnet50

# Instantiate TorchVision ResNet50 model
weights = ResNet50_Weights.DEFAULT
model = resnet50(weights=weights)
model.eval()

size = 224
batches = [1, 2, 3, 4] # Supported batch sizes
input_infos = []

# Create input information for each batch size
for i, batch in enumerate(batches):
    input_info = [("input_np", [batch, 3, size, size], "float32")]
    input_infos.append(input_info)

# Compile the model with the pre-defined input information
compiled_model = rebel.compile_from_torch(model, input_info=input_infos)

# Save the compiled model to local storage
compiled_model.save("resnet50_bucketing.rbln")

Batch Deployment with Bucketing

Combine a bucketing-enabled model with Ray Serve’s @serve.batch decorator to serve dynamic batch sizes efficiently. Execution and inference flows are identical to the Deployment Flow section, so reuse the same CLI command and HTTP request.

resnet50_batch.py
# File name: resnet50_batch.py
import io
import json
import os
from typing import List

import numpy as np
import ray
import rebel
import torch
from PIL import Image
from ray import serve
from starlette.requests import Request
from torchvision.models import ResNet50_Weights

ray.init(resources={"RBLN": 1})


@ray.remote(resources={"RBLN": 1})
class RBLNActor:
    def getDeviceId(self):
        return ray.get_runtime_context().get_accelerator_ids()["RBLN"]


@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 4})
class Resnet50:
    async def __init__(self, rbln_actor: RBLNActor):
        self.initialized = False
        self.model = None
        self.weights = None
        self.prep = None
        self.rbln_actor = rbln_actor
        self.ids = ray.get(rbln_actor.getDeviceId.remote())
        self.batch_size = 0
        await self.rbln_devices()
        await self.initialize()

    async def initialize(self):
        """
        Initialize model. This will be called during model loading time
        :return:
        """
        model_path = "./resnet50_bucketing.rbln"
        if not os.path.isfile(model_path):
            raise RuntimeError(
                f"[RBLN ERROR] File not found at the specified model_path({model_path})."
            )
        compiled_model = rebel.RBLNCompiledModel(model_path)
        self.module = rebel.AsyncRuntime(
            compiled_model, tensor_type="pt", device=int(self.ids[0])
        )
        self.weights = ResNet50_Weights.DEFAULT  # Initialize weights first
        self.prep = self.weights.transforms()
        self.initialized = True

    async def rbln_devices(self):
        """
        Redefine the environment variables to be passed to the RBLN runtime
        :return:
        """
        if self.ids is None or len(self.ids) <= 0:
            os.environ.pop("RBLN_DEVICES")
        os.environ["RBLN_DEVICES"] = ",".join(self.ids)

    async def preprocess(self, input_data_list):
        """
        Transform raw input into model input data.
        :param input_data_list: list of raw requests, should match batch size
        :return: list of preprocessed model input data
        """
        preprocessed_batch = []

        for input_data in input_data_list:
            assert input_data is not None, print(
                "[RBLN][ERROR] Data not found with client request."
            )
            if not isinstance(input_data, (bytes, bytearray)):
                raise ValueError("[RBLN][ERROR] Preprocessed data is not binary data.")

            try:
                image = Image.open(io.BytesIO(input_data))
            except Exception as e:
                raise ValueError(f"[RBLN][ERROR]Invalid image data: {e}") from e
            processed_image = self.prep(image).unsqueeze(0)
            preprocessed_batch.append(processed_image)

        preprocessed_data = np.concatenate(preprocessed_batch, axis=0).copy()

        return torch.from_numpy(preprocessed_data)

    async def inference(self, model_input):
        """
        Internal inference methods
        :param model_input: transformed model input data (batch)
        :return: list of inference output in NDArray
        """
        task = self.module.run(model_input)
        return task.wait()

    async def postprocess(self, inference_output):
        """
        Return inference result.
        :param inference_output: batch of inference output
        :return: list of predict results
        """
        all_category_names = []
        chunky_batched_result = np.array_split(
            inference_output, self.batch_size, axis=0
        )

        for result in chunky_batched_result:
            score, class_id = torch.topk(result, 1, dim=1)
            batch_category_names = []
            for i in range(class_id.shape[0]):
                category_name = self.weights.meta["categories"][class_id[i].item()]
                batch_category_names.append(category_name)
            all_category_names.extend(batch_category_names)

        return all_category_names

    @serve.batch(max_batch_size=4, batch_wait_timeout_s=0.5)
    async def __call__(self, http_requests: List[Request]) -> List[str]:
        """
        Handle batch of HTTP requests
        :param http_requests: List of HTTP requests
        :return: List of JSON string results
        """

        self.batch_size = len(http_requests)
        image_bytes_list = []
        for request in http_requests:
            image_byte = await request.body()
            image_bytes_list.append(image_byte)

        # Process batch
        model_input = await self.preprocess(image_bytes_list)
        model_output = await self.inference(model_input)
        batched_category_names = await self.postprocess(model_output)

        # Return list of JSON strings for each prediction
        results = []
        for idx, category_name in enumerate(batched_category_names):
            results.append(json.dumps({f"{idx}": category_name}))

        return results


rbln_actor = RBLNActor.remote()
app = Resnet50.bind(rbln_actor)

Execution & Inference Reminder

The execution command and inference request reuse the exact steps from the Deployment Flow section. Reference those earlier instructions whenever you enable batching or bucketing.

References