Concurrent Processing
Achieving full utilization when inferring a deep learning model with accelerators is a complex task. Each inference operation inherently involves some waiting time. To understand this better, it is essential to provide a concise explanation of the process. The following steps must be executed to obtain inference results from the accelerator:
- Loading (and preprocessing) inputs - host (CPU)
- Feeding into the accelerator - host-accelerator IO (DMA)
- Running workloads - accelerator
- Getting outputs - accelerator-host IO (DMA)
The host CPU is actively involved in step 1, while the accelerator takes over in step 3. Additional DMA transfers may occur during step 3 if processing on the host is beneficial.
In a multiple inference scenario, where each inference is data-independent, RBLN SDK can optimize the utilization of the aforementioned steps by implementing the concept of concurrency. This involves processing multiple samples simultaneously, thereby filling idle time gaps with other inferences and ensuring maximal utilization of the device.
RBLN SDK offers a straightforward and user-friendly set of asynchronous APIs for achieving concurrency. This document provides an overview of our asynchronous APIs with two examples, TensorFlow and PyTorch, demonstrating how to effectively utilize them.
Prerequisite
Before getting started, please make sure you have installed the following pip packages in your system:
How to use
Once the DL model has been compiled, you will obtain an RBLNCompiledModel
object. To utilize an asynchronous runtime, create a runtime in async mode:
| import rebel
compiled_model: rebel.RBLNCompiledModel = rebel.compile_from_torchscript(model)
runtime: rebel.AsyncRuntime = compiled_model.create_async_runtime()
|
When a pre-compiled and saved model *.rbln
is available from local storage, it is possible to directly create an asynchronous runtime as below:
| import rebel
runtime = rebel.AsyncRuntime("/path/to/saved_file.rbln")
|
If your program utilizes native asyncio as the event loop, using AsyncRuntime.async_run
is an excellent choice for Pythonic programming, as it adheres to the PEP-492 async and await syntax:
| output = await runtime.async_run(input_)
|
If the caller program runs on a different type of event loop, such as PyQt or gevent, the invocations and joins need to be managed manually in accordance with the logic. To facilitate this, RBLN SDK also offers AsyncRuntime.run
, which is a straightforward asynchronous version of the run function.
| task = runtime.run(input_)
# Do other jobs...
output = task.wait()
|
Examples
TensorFlow - UNet
UNet is renowned for its ability to provide pixel-level predictions and is widely applied in tasks such as medical image biomarker detection, instance segmentation, depth estimation, and more. Unlike simple classification models, UNet
takes an input image and generates output prediction images, necessitating higher transaction bandwidth. This, in turn, increases the communication time between the host and the accelerator.
In this example, we will demonstrate a depth estimation task following the official Keras Tutorial. The model has been trained on the NYU Depth V2 dataset.
Below are step-by-step examples for efficiently inferring UNet
using the RBLN NPU. To begin, run the following bash script to obtain the dataset:
| curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000000.tar
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000001.tar
tar -xf val-000000.tar
tar -xf val-000001.tar
|
In this tutorial, we used 100 samples for the test:
| from pathlib import Path
import cv2
import h5py
import numpy as np
import tensorflow as tf
class NYUDepthV2Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val" / "official"
self.samples = sorted(image_root.glob("*.h5"))
def __getitem__(self, index):
h5f = h5py.File(self.samples[index], "r")
image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
image = cv2.resize(image, (512, 512))
image = tf.image.convert_image_dtype(image, tf.float32)
return image
dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
|
Once the sample data is prepared, we can simply compile the model as below:
| import rebel
import tensorflow as tf
import numpy as np
from huggingface_hub import from_pretrained_keras
model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
tf.function(lambda x : model(x)),
input_info=[("x", [1, 512, 512, 3], np.float32)],
)
|
By creating the async runtime, we can run it concurrently as below:
| import asyncio
runtime = compiled_model.create_async_runtime()
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample.numpy(), axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|
Below is the complete code that includes all the steps above to execute UNet
using our asynchronous runtime APIs:
| import cv2
import h5py
import numpy as np
import rebel
import tensorflow as tf
import asyncio
from pathlib import Path
from huggingface_hub import from_pretrained_keras
class NYUDepthV2Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val" / "official"
self.samples = sorted(image_root.glob("*.h5"))
def __getitem__(self, index):
h5f = h5py.File(self.samples[index], "r")
image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
image = cv2.resize(image, (512, 512))
image = tf.image.convert_image_dtype(image, tf.float32)
return image
dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
tf.function(lambda x : model(x)),
input_info=[("x", [1, 512, 512, 3], np.float32)],
)
runtime = compiled_model.create_async_runtime()
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample.numpy(), axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|
PyTorch - YOLOv8
YOLOv8 is a popular object detection model. In this example, we will use YOLOv8
with the COCO dataset. To access the dataset, please refer to the link.
Let's assume that the images and the annotation file for the 2017/validation set
are stored in the current directory:
| images: ./val2017
annotation file: ./annotations/instances_val2017.json
|
Below is the complete code to execute YOLOv8
using our asynchronous runtime APIs:
| from pathlib import Path
import numpy as np
import torch
import rebel
from PIL import Image
from pycocotools.coco import COCO
from ultralytics import YOLO
from ultralytics.data.augment import LetterBox
class COCO2017Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val2017"
self.coco_gt = COCO(dataset_root / "annotations" / "instances_val2017.json")
# Create sample
self.samples: typing.List[Sample] = []
for image_id in self.coco_gt.getImgIds():
self.samples.append(
(
image_id,
image_root / self.coco_gt.imgs[image_id]["file_name"],
)
)
def __getitem__(self, index):
image_id, image_path = self.samples[index]
image = Image.open(image_path)
letterbox_transform = LetterBox()
array = np.asarray(image, dtype=np.float32)
array = letterbox_transform(image=array)
array = np.transpose(array, [2, 0, 1])
array = np.ascontiguousarray(array)
array /= 255
return array
dataset = COCO2017Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
model = YOLO(f"yolov8n.pt").model.eval()
model(torch.zeros([1, 3, 640, 640], dtype=torch.float32))
scripted_model = torch.jit.trace(model, torch.zeros([1, 3, 640, 640], dtype=torch.float32))
compiled_model = rebel.compile_from_torchscript(scripted_model)
import asyncio
runtime = compiled_model.create_async_runtime()
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample, axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|