동시실행(Concurrent Processing)
가속기의 모든 연산 자원의 사용율을 최대로 유지하는 것은 매우 어려운 일입니다. 각 추론 태스크를 처리하기 위해서는 기본적으로 약간의 대기시간이 포함될 수 밖에 없습니다. 이러한 현상을 보다 정확히 이해하기 위해 가속기에서 최종 추론 결과를 얻는 과정을 살펴봅니다:
- 사전처리 된(pre-processed) 입력 데이터 준비 - 호스트 (CPU)
- 가속기에 입력 데이터 전송 - 호스트-가속기 입출력 (DMA)
- 가속기에서 워크로드 수행 - 가속기 (NPU)
- 가속기로부터 출력 데이터 가져오기 - 가속기-호스트 입출력 (DMA)
호스트 CPU는 1단계에서, 가속기 NPU는 3단계에서 활발하게 동작하고, 만일 가속기 연산 중 일부 연산을 호스트에서 처리하는 것이 더 효율적이라고 판단되는 경우 3단계 에서도 추가적인 가속기-호스트 입출력 DMA가 발생할 수 있습니다.
RBLN SDK는 다중 추론 시나리오에서 동시성(concurrency) 개념을 구현함으로써 앞서 언급한 각 단계의 작업을 중첩시킬 수 있으며, 이를 통해 각 추론 작업 사이의 유휴시간을 줄이고 가속기 자원의 사용율을 높일 수 있습니다.
RBLN SDK는 직관적이고 사용자 친화적인 비동기 API 형태로 동시성 기능을 제공합니다. 이 튜토리얼에서는 텐서플로우, 파이토치 예제를 통해 RBLN SDK의 비동기 API를 사용하는 방법을 배울 수 있습니다.
사전 준비
시작하기에 앞서 아래의 파이썬 패키지들이 설치되어 있는지 확인합니다:
사용 방법
RBLN 컴파일러를 사용하여 딥러닝 모델을 성공적으로 컴파일했다면 RBLNCompiledModel
오브젝트를 얻을 수 있고, 비동기모드의 RBLN 런타임을 통해 컴파일된 모델을 로드할 수 있습니다:
| import rebel
compiled_model: rebel.RBLNCompiledModel = rebel.compile_from_torchscript(model)
runtime: rebel.AsyncRuntime = compiled_model.create_async_runtime()
|
사전에 컴파일된 모델이 로컬 저장소에 저장되어있는 경우, 컴파일된 *.rbln
모델의 경로를 입력하여 바로 비동기 런타임 모듈을 생성할 수도 있습니다:
| import rebel
runtime = rebel.AsyncRuntime("/path/to/saved_file.rbln")
|
사용자의 프로그램에서 네이티브 asyncio를 이벤트 루프로 활용하고 있다면, 파이써닉(Pythonic) 프로그래밍을 위해 PEP-492 async and await syntax를 준수하는AsyncRuntime.async_run
을 사용하면 됩니다:
| output = await runtime.async_run(input_)
|
만약 호출자(caller) 프로그램이 PyQT 또는 gevent와 같은 유형의 이벤트 루프에서 실행되는 경우, 로직에 따라 호출(invocation) 및 조인(join)을 수동으로 관리해야 합니다. 이를 위해 RBLN SDK는 run()
함수의 간단한 비동기 버전인 AsyncRuntime.run()
을 함께 제공합니다.
| task = runtime.run(input_)
# Do other jobs...
output = task.wait()
|
성능 최적화 고려사항
RBLN SDK는 AsyncRuntime의 parallel
매개변수를 통해 입력 데이터 준비 과정을 유연하게 관리할 수 있습니다. 단일 스레드 준비(parallel=1
)가 기본 동작이지만, 특정 상황에서는 두 개의 스레드를 사용하는 이중 버퍼링(parallel=2
)을 활성화하여 성능을 향상시킬 수 있습니다.
이중 버퍼링을 사용하면 NPU가 현재 입력을 처리하는 동안 다른 스레드가 다음 입력을 준비할 수 있어, 다음과 같은 상황에서 유용할 수 있습니다:
- 호스트에서의 입력 전처리 및 데이터 준비가 계산 집약적인 경우
- 모델의 추론 시간이 입력 준비 시간과 비슷한 경우
- DMA 전송 시간이 상당한 경우:
- 호스트와 NPU 간에 큰 데이터 전송이 필요한 대용량 입출력 텐서
- 여러 번의 DMA 전송이 필요한 다중 입출력 모델
그러나 이중 버퍼링이 항상 성능 향상을 보장하지는 않으며 일부 모델에서는 안정성에 영향을 줄 수 있으므로, 사용자는 자신의 특정 사용 사례에 대해 벤치마크를 수행해야 합니다.
예제
텐서플로우 - UNet
UNet은 픽셀 수준 예측(pixel-level prediction)에서 많이 쓰이는 모델 중 하나이며, 의료 영상 바이오마커 검출(medical image biomarker detection), 인스턴스 세그멘테이션(instance segmentation), 깊이 추정(depth estimation) 등과 같은 응용에서 많이 사용됩니다. UNet은 단순한 분류 모델들과는 다르게 입력 이미지로부터 픽셀 수준의 출력 이미지를 생성하기 때문에, 더 높은 입출력 대역폭을 필요로 하여 호스트와 가속기 간의 통신 시간이 늘어납니다.
이 예제에서는 공식 케라스 튜토리얼에서 제공하는, NYU Depth V2 데이터셋으로 학습 된 깊이추정 응용 태스크를 사용합니다.
다음의 스크립트를 통해 테스트를 위한 데이터셋을 다운로드할 수 있습니다:
| curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000000.tar
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000001.tar
tar -xf val-000000.tar
tar -xf val-000001.tar
|
본 튜토리얼 에서는 100개의 샘플데이터를 사용합니다:
| from pathlib import Path
import cv2
import h5py
import numpy as np
import tensorflow as tf
class NYUDepthV2Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val" / "official"
self.samples = sorted(image_root.glob("*.h5"))
def __getitem__(self, index):
h5f = h5py.File(self.samples[index], "r")
image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
image = cv2.resize(image, (512, 512))
image = tf.image.convert_image_dtype(image, tf.float32)
return image
dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
|
샘플 데이터가 준비되면 아래와 같이 모델을 컴파일 할 수 있습니다:
| import rebel
import tensorflow as tf
import numpy as np
from huggingface_hub import from_pretrained_keras
model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
tf.function(lambda x : model(x)),
input_info=[("x", [1, 512, 512, 3], np.float32)],
)
|
비동기 런타임 모듈을 생성하여 동시성 추론을 수행할 수 있습니다:
| import asyncio
runtime = rebel.AsyncRuntime(compiled_model, parallel=2)
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample.numpy(), axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|
지금까지의 튜토리얼 내용을 하나의 파일로 모은 코드를 제공합니다.
아래의 스크립트를 통해 비동기 런타임 API를 기반으로 UNet
모델의 동시성 추론을 수행할 수 있습니다.
| import cv2
import h5py
import numpy as np
import rebel
import tensorflow as tf
import asyncio
from pathlib import Path
from huggingface_hub import from_pretrained_keras
class NYUDepthV2Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val" / "official"
self.samples = sorted(image_root.glob("*.h5"))
def __getitem__(self, index):
h5f = h5py.File(self.samples[index], "r")
image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
image = cv2.resize(image, (512, 512))
image = tf.image.convert_image_dtype(image, tf.float32)
return image
dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
tf.function(lambda x : model(x)),
input_info=[("x", [1, 512, 512, 3], np.float32)],
)
runtime = rebel.AsyncRuntime(compiled_model, parallel=2)
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample.numpy(), axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|
파이토치 - YOLOv8
YOLOv8은 물체 검출(object detection) 응용 태스크에서 많이 사용되는 모델 중 하나이며, 이 예제에서는 COCO 데이터셋으로 학습된 YOLOv8 모델을 이용하여 물체 검출을 수행합니다. COCO 데이터셋은 link를 통해 다운로드할 수 있습니다.
2017/validation set
의 모든 이미지와 어노테이션(annotation) 파일들이 현재 디렉토리에 준비되어 있다고 가정합니다:
| images: ./val2017
annotation file: ./annotations/instances_val2017.json
|
아래의 스크립트를 통해 비동기 런타임 API를 기반으로 YOLOv8
모델의 동시성 추론을 수행할 수 있습니다.
| from pathlib import Path
import numpy as np
import torch
import rebel
from PIL import Image
from pycocotools.coco import COCO
from ultralytics import YOLO
from ultralytics.data.augment import LetterBox
class COCO2017Dataset():
def __init__(self, dataset_root: Path):
image_root = dataset_root / "val2017"
self.coco_gt = COCO(dataset_root / "annotations" / "instances_val2017.json")
# Create sample
self.samples: typing.List[Sample] = []
for image_id in self.coco_gt.getImgIds():
self.samples.append(
(
image_id,
image_root / self.coco_gt.imgs[image_id]["file_name"],
)
)
def __getitem__(self, index):
image_id, image_path = self.samples[index]
image = Image.open(image_path)
letterbox_transform = LetterBox()
array = np.asarray(image, dtype=np.float32)
array = letterbox_transform(image=array)
array = np.transpose(array, [2, 0, 1])
array = np.ascontiguousarray(array)
array /= 255
return array
dataset = COCO2017Dataset(Path("."))
samples = [dataset[i] for i in range(100)]
model = YOLO(f"yolov8n.pt").model.eval()
model(torch.zeros([1, 3, 640, 640], dtype=torch.float32))
scripted_model = torch.jit.trace(model, torch.zeros([1, 3, 640, 640], dtype=torch.float32))
compiled_model = rebel.compile_from_torchscript(scripted_model)
import asyncio
runtime = rebel.AsyncRuntime(compiled_model)
async def main() -> None:
tasks = []
for sample in samples:
sample = np.expand_dims(sample, axis=0)
task = runtime.async_run(sample)
tasks.append(task)
logits = await asyncio.gather(*tasks)
return logits
logits = asyncio.run(main())
|