콘텐츠로 이동

동시실행(Concurrent Processing)

가속기의 모든 연산 자원의 사용율을 최대로 유지하는 것은 매우 어려운 일입니다. 각 추론 태스크를 처리하기 위해서는 기본적으로 약간의 대기시간이 포함될 수 밖에 없습니다. 이러한 현상을 보다 정확히 이해하기 위해 가속기에서 최종 추론 결과를 얻는 과정을 살펴봅니다:

  1. 사전처리 된(pre-processed) 입력 데이터 준비 - 호스트 (CPU)
  2. 가속기에 입력 데이터 전송 - 호스트-가속기 입출력 (DMA)
  3. 가속기에서 워크로드 수행 - 가속기 (NPU)
  4. 가속기로부터 출력 데이터 가져오기 - 가속기-호스트 입출력 (DMA)

호스트 CPU는 1단계에서, 가속기 NPU는 3단계에서 활발하게 동작하고, 만일 가속기 연산 중 일부 연산을 호스트에서 처리하는 것이 더 효율적이라고 판단되는 경우 3단계 에서도 추가적인 가속기-호스트 입출력 DMA가 발생할 수 있습니다.

RBLN SDK는 다중 추론 시나리오에서 동시성(concurrency) 개념을 구현함으로써 앞서 언급한 각 단계의 작업을 중첩시킬 수 있으며, 이를 통해 각 추론 작업 사이의 유휴시간을 줄이고 가속기 자원의 사용율을 높일 수 있습니다.

RBLN SDK는 직관적이고 사용자 친화적인 비동기 API 형태로 동시성 기능을 제공합니다. 이 튜토리얼에서는 텐서플로우, 파이토치 예제를 통해 RBLN SDK의 비동기 API를 사용하는 방법을 배울 수 있습니다.

사전 준비

시작하기에 앞서 아래의 파이썬 패키지들이 설치되어 있는지 확인합니다:

사용 방법

RBLN 컴파일러를 사용하여 딥러닝 모델을 성공적으로 컴파일했다면 RBLNCompiledModel 오브젝트를 얻을 수 있고, 비동기모드의 RBLN 런타임을 통해 컴파일된 모델을 로드할 수 있습니다:

1
2
3
4
import rebel

compiled_model: rebel.RBLNCompiledModel = rebel.compile_from_torchscript(model)
runtime: rebel.AsyncRuntime = compiled_model.create_async_runtime()

사전에 컴파일된 모델이 로컬 저장소에 저장되어있는 경우, 컴파일된 *.rbln 모델의 경로를 입력하여 바로 비동기 런타임 모듈을 생성할 수도 있습니다:

1
2
3
import rebel

runtime = rebel.AsyncRuntime("/path/to/saved_file.rbln")

사용자의 프로그램에서 네이티브 asyncio를 이벤트 루프로 활용하고 있다면, 파이써닉(Pythonic) 프로그래밍을 위해 PEP-492 async and await syntax를 준수하는AsyncRuntime.async_run을 사용하면 됩니다:

output = await runtime.async_run(input_)

만약 호출자(caller) 프로그램이 PyQT 또는 gevent와 같은 유형의 이벤트 루프에서 실행되는 경우, 로직에 따라 호출(invocation) 및 조인(join)을 수동으로 관리해야 합니다. 이를 위해 RBLN SDK는 run() 함수의 간단한 비동기 버전인 AsyncRuntime.run() 을 함께 제공합니다.

1
2
3
4
5
task = runtime.run(input_)

# Do other jobs...

output = task.wait()

예제

텐서플로우 - UNet

UNet은 픽셀 수준 예측(pixel-level prediction)에서 많이 쓰이는 모델 중 하나이며, 의료 영상 바이오마커 검출(medical image biomarker detection), 인스턴스 세그멘테이션(instance segmentation), 깊이 추정(depth estimation) 등과 같은 응용에서 많이 사용됩니다. UNet은 단순한 분류 모델들과는 다르게 입력 이미지로부터 픽셀 수준의 출력 이미지를 생성하기 때문에, 더 높은 입출력 대역폭을 필요로 하여 호스트와 가속기 간의 통신 시간이 늘어납니다.

이 예제에서는 공식 케라스 튜토리얼에서 제공하는, NYU Depth V2 데이터셋으로 학습 된 깊이추정 응용 태스크를 사용합니다.

다음의 스크립트를 통해 테스트를 위한 데이터셋을 다운로드할 수 있습니다:

1
2
3
4
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000000.tar
curl -L -O https://huggingface.co/datasets/sayakpaul/nyu_depth_v2/resolve/main/data/val-000001.tar
tar -xf val-000000.tar
tar -xf val-000001.tar

본 튜토리얼 에서는 100개의 샘플데이터를 사용합니다:

from pathlib import Path

import cv2
import h5py
import numpy as np
import tensorflow as tf

class NYUDepthV2Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val" / "official"
        self.samples = sorted(image_root.glob("*.h5"))

    def __getitem__(self, index):
        h5f = h5py.File(self.samples[index], "r")
        image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
        image = cv2.resize(image, (512, 512))
        image = tf.image.convert_image_dtype(image, tf.float32)
        return image

dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

샘플 데이터가 준비되면 아래와 같이 모델을 컴파일 할 수 있습니다:

import rebel
import tensorflow as tf
import numpy as np

from huggingface_hub import from_pretrained_keras

model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
    tf.function(lambda x : model(x)),
    input_info=[("x", [1, 512, 512, 3], np.float32)],
)

비동기 런타임 모듈을 생성하여 동시성 추론을 수행할 수 있습니다:

import asyncio

runtime = compiled_model.create_async_runtime()

async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample.numpy(), axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())

지금까지의 튜토리얼 내용을 하나의 파일로 모은 코드를 제공합니다. 아래의 스크립트를 통해 비동기 런타임 API를 기반으로 UNet 모델의 동시성 추론을 수행할 수 있습니다.

import cv2
import h5py
import numpy as np
import rebel
import tensorflow as tf
import asyncio

from pathlib import Path
from huggingface_hub import from_pretrained_keras


class NYUDepthV2Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val" / "official"
        self.samples = sorted(image_root.glob("*.h5"))

    def __getitem__(self, index):
        h5f = h5py.File(self.samples[index], "r")
        image = np.transpose(np.array(h5f["rgb"]), (1, 2, 0))
        image = cv2.resize(image, (512, 512))
        image = tf.image.convert_image_dtype(image, tf.float32)
        return image


dataset = NYUDepthV2Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

model = from_pretrained_keras("keras-io/deeplabv3p-resnet50")
compiled_model = rebel.compile_from_tf_function(
    tf.function(lambda x : model(x)),
    input_info=[("x", [1, 512, 512, 3], np.float32)],
)


runtime = compiled_model.create_async_runtime()


async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample.numpy(), axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())

파이토치 - YOLOv8

YOLOv8은 물체 검출(object detection) 응용 태스크에서 많이 사용되는 모델 중 하나이며, 이 예제에서는 COCO 데이터셋으로 학습된 YOLOv8 모델을 이용하여 물체 검출을 수행합니다. COCO 데이터셋은 link를 통해 다운로드할 수 있습니다.

2017/validation set의 모든 이미지와 어노테이션(annotation) 파일들이 현재 디렉토리에 준비되어 있다고 가정합니다:

images: ./val2017
annotation file: ./annotations/instances_val2017.json

아래의 스크립트를 통해 비동기 런타임 API를 기반으로 YOLOv8 모델의 동시성 추론을 수행할 수 있습니다.

from pathlib import Path

import numpy as np
import torch
import rebel
from PIL import Image
from pycocotools.coco import COCO
from ultralytics import YOLO
from ultralytics.data.augment import LetterBox

class COCO2017Dataset():
    def __init__(self, dataset_root: Path):
        image_root = dataset_root / "val2017"
        self.coco_gt = COCO(dataset_root / "annotations" / "instances_val2017.json")

        # Create sample
        self.samples: typing.List[Sample] = []
        for image_id in self.coco_gt.getImgIds():
            self.samples.append(
                (
                    image_id,
                    image_root / self.coco_gt.imgs[image_id]["file_name"],
                )
            )

    def __getitem__(self, index):
        image_id, image_path = self.samples[index]
        image = Image.open(image_path)
        letterbox_transform = LetterBox()
        array = np.asarray(image, dtype=np.float32)
        array = letterbox_transform(image=array)
        array = np.transpose(array, [2, 0, 1])
        array = np.ascontiguousarray(array)
        array /= 255
        return array

dataset = COCO2017Dataset(Path("."))
samples = [dataset[i] for i in range(100)]

model = YOLO(f"yolov8n.pt").model.eval()
model(torch.zeros([1, 3, 640, 640], dtype=torch.float32))
scripted_model = torch.jit.trace(model, torch.zeros([1, 3, 640, 640], dtype=torch.float32))
compiled_model = rebel.compile_from_torchscript(scripted_model)

import asyncio

runtime = compiled_model.create_async_runtime()

async def main() -> None:
    tasks = []

    for sample in samples:
        sample = np.expand_dims(sample, axis=0)
        task = runtime.async_run(sample)
        tasks.append(task)

    logits = await asyncio.gather(*tasks)
    return logits


logits = asyncio.run(main())