Skip to content

Concurrent Processing

Achieving full utilization when inferring a deep learning model with accelerators is a complex task. Each inference operation inherently involves some waiting time. To understand this better, it is essential to provide a concise explanation of the process. The following steps must be executed to obtain inference results from the accelerator:

  1. Loading (and preprocessing) inputs - host (CPU)
  2. Feeding into the accelerator - host-accelerator IO (DMA)
  3. Running workloads - accelerator
  4. Getting outputs - accelerator-host IO (DMA)

The host CPU is actively involved in step 1, while the accelerator takes over in step 3. Additional DMA transfers may occur during step 3 if processing on the host is beneficial.

In a multiple inference scenario, where each inference is data-independent, RBLN SDK can optimize the utilization of the aforementioned steps by implementing the concept of concurrency. This involves processing multiple samples simultaneously, thereby filling idle time gaps with other inferences and ensuring maximal utilization of the device.

RBLN SDK offers a straightforward and user-friendly set of asynchronous APIs for achieving concurrency. This document provides an overview of our asynchronous APIs with two examples, TensorFlow and PyTorch, demonstrating how to effectively utilize them.

Prerequisite

Before getting started, please make sure you have installed the following pip packages in your system:

How to use

Once the DL model has been compiled, you will obtain an RBLNCompiledModel object. To utilize an asynchronous runtime, create a runtime in async mode:

1
2
3
4
import rebel

compiled_model: rebel.RBLNCompiledModel = rebel.compile_from_torchscript(model)
runtime: rebel.AsyncRuntime = compiled_model.create_async_runtime()

When a pre-compiled and saved model *.rbln is available from local storage, it is possible to directly create an asynchronous runtime as below:

1
2
3
import rebel

runtime = rebel.AsyncRuntime("/path/to/saved_file.rbln")

If your program utilizes native asyncio as the event loop, using AsyncRuntime.async_run is an excellent choice for Pythonic programming, as it adheres to the PEP-492 async and await syntax:

output = await runtime.async_run(input_)

If the caller program runs on a different type of event loop, such as PyQt or gevent, the invocations and joins need to be managed manually in accordance with the logic. To facilitate this, RBLN SDK also offers AsyncRuntime.run, which is a straightforward asynchronous version of the run function.

1
2
3
4
5
task = runtime.run(input_)

# Do other jobs...

output = task.wait()

Examples

TensorFlow - UNet

UNet is renowned for its ability to provide pixel-level predictions and is widely applied in tasks such as medical image biomarker detection, instance segmentation, depth estimation, and more. Unlike simple classification models, UNet takes an input image and generates output prediction images, necessitating higher transaction bandwidth. This, in turn, increases the communication time between the host and the accelerator.

In this example, we will demonstrate a depth estimation task following the official Keras Tutorial. The model has been trained on the NYU Depth V2 dataset.

Below are step-by-step examples for efficiently inferring UNet using the RBLN NPU. To begin, run the following bash script to obtain the dataset:

1
2
3
4
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000000.tar
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000001.tar
tar -xf val-000000.tar
tar -xf val-000001.tar

In this tutorial, we used 100 samples for the test:

from pathlib import Path

import cv2
import h5py
import numpy as np
import tensorflow as tf


class NYUDepthV2Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val" / "official"
        self.samples = sorted(image_root.glob("*.h5"))

    def __getitem__(self, index):
        h5f = h5py.File(self.samples[index], "r")
        image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
        image = cv2.resize(image, (512, 512))
        image = tf.image.convert_image_dtype(image, tf.float32)
        return image


dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

Once the sample data is prepared, we can simply compile the model as below:

import rebel
import tensorflow as tf
import numpy as np

from huggingface_hub import from_pretrained_keras

model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
    tf.function(lambda x : model(x)),
    input_info=[("x", [1, 512, 512, 3], np.float32)],
)

By creating the async runtime, we can run it concurrently as below:

import asyncio

runtime = compiled_model.create_async_runtime()


async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample.numpy(), axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())

Below is the complete code that includes all the steps above to execute UNet using our asynchronous runtime APIs:

import cv2
import h5py
import numpy as np
import rebel
import tensorflow as tf
import asyncio

from pathlib import Path
from huggingface_hub import from_pretrained_keras


class NYUDepthV2Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val" / "official"
        self.samples = sorted(image_root.glob("*.h5"))

    def __getitem__(self, index):
        h5f = h5py.File(self.samples[index], "r")
        image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
        image = cv2.resize(image, (512, 512))
        image = tf.image.convert_image_dtype(image, tf.float32)
        return image


dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
    tf.function(lambda x : model(x)),
    input_info=[("x", [1, 512, 512, 3], np.float32)],
)


runtime = compiled_model.create_async_runtime()


async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample.numpy(), axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())

PyTorch - YOLOv8

YOLOv8 is a popular object detection model. In this example, we will use YOLOv8 with the COCO dataset. To access the dataset, please refer to the link.

Let's assume that the images and the annotation file for the 2017/validation set are stored in the current directory:

images: ./val2017
annotation file: ./annotations/instances_val2017.json

Below is the complete code to execute YOLOv8 using our asynchronous runtime APIs:

from pathlib import Path

import numpy as np
import torch
import rebel
from PIL import Image
from pycocotools.coco import COCO
from ultralytics import YOLO
from ultralytics.data.augment import LetterBox

class COCO2017Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val2017"
        self.coco_gt = COCO(dataset_root / "annotations" / "instances_val2017.json")

        # Create sample
        self.samples: typing.List[Sample] = []
        for image_id in self.coco_gt.getImgIds():
            self.samples.append(
                (
                    image_id,
                    image_root / self.coco_gt.imgs[image_id]["file_name"],
                )
            )

    def __getitem__(self, index):
        image_id, image_path = self.samples[index]
        image = Image.open(image_path)
        letterbox_transform = LetterBox()
        array = np.asarray(image, dtype=np.float32)
        array = letterbox_transform(image=array)
        array = np.transpose(array, [2, 0, 1])
        array = np.ascontiguousarray(array)
        array /= 255
        return array

dataset = COCO2017Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

model = YOLO(f"yolov8n.pt").model.eval()
model(torch.zeros([1, 3, 640, 640], dtype=torch.float32))
scripted_model = torch.jit.trace(model, torch.zeros([1, 3, 640, 640], dtype=torch.float32))
compiled_model = rebel.compile_from_torchscript(scripted_model)

import asyncio

runtime = compiled_model.create_async_runtime()


async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample, axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())