콘텐츠로 이동

Disaggregated Encoder

Disaggregated Encoder 베타

Disaggregated Encoder는 현재 베타 단계이며 개발이 진행 중입니다. 릴리스마다 설정 항목, connector 동작, proxy 인터페이스가 변경될 수 있으며, 하위 호환성은 보장되지 않습니다. 프로덕션 환경에서의 사용은 아직 권장하지 않습니다. 안정화 및 정식 릴리스를 위해 테스트와 피드백을 환영합니다.

1. 개요

멀티모달 모델에서 visual encoder와 언어 모델은 연산 및 스케줄링 특성이 크게 다릅니다. encoder는 짧고 버스트성이 강한 단계로 동작하므로, 같은 NPU에 배치할 경우 언어 모델의 prefill·decode가 사용할 자원을 충분히 활용하지 못합니다.

Disaggregated Encoder (EC disaggregation)는 이 두 단계를 별개의 vLLM 프로세스로 분리합니다.

  • Encoder 인스턴스는 visual encoder만 실행합니다.
  • PD (Prefill+Decode) 인스턴스는 언어 모델을 실행합니다.
  • 인코딩된 멀티모달 피처는 EC connector를 통해 encoder에서 PD로 전달됩니다.
  • 앞단의 가벼운 proxy가 멀티모달 아이템을 encoder로 분배(fan-out)하고 원본 요청을 PD로 전달합니다.

이를 통해 encoder는 적은 NPU를 사용하는 인스턴스를 여러 개 실행해 수평으로 확장하고, LLM 단계는 별도의 NPU 집합에서 전담하도록 구성할 수 있습니다.

1.1 현재 지원 범위

  • 토폴로지: N encoder + 1 PD (prefill과 decode 결합).
  • 모델: Qwen3-VL 계열만 지원합니다.
  • Connector: RblnECNixlConnector가 encoder와 PD 사이의 encoder 캐시 전송을 담당합니다.

2. 아키텍처

Disaggregated encoder 요청 흐름

요청 흐름:

  1. Proxy는 수신한 chat 요청에서 모든 image / audio 아이템을 추출합니다.
  2. 텍스트를 제외한 각 멀티모달 항목당 하나의 요청을 round-robin 방식으로 encoder 클러스터에 전송합니다. encoder에서 계산된 visual feature는 connector를 통해 PD로 적재됩니다.
  3. Proxy는 원본 요청을 PD로 전달합니다. PD는 encoder 캐시를 가져와 prefill과 decode를 수행합니다.

3. 사전 준비

  • 시스템 요구 사항:
  • 필수 패키지:
    • RBLN Compiler
    • optimum-rbln (Qwen3-VL 지원 버전)
    • vllm-rbln v0.10.3 이상 (RblnECNixlConnector가 도입된 버전).
    • nixlRblnECNixlConnector가 encoder 캐시 전송에 사용합니다. pip install nixl로 설치하세요.
  • NPU: encoder 인스턴스당 1 NPU + PD 인스턴스 8 NPU. 다음 예시는 8 encoder + 1 PD = 총 16 NPU를 사용합니다.
  • 컴파일된 모델: Qwen3-VL 컴파일 결과를 encoder와 PD 양쪽에서 그대로 재사용합니다. 별도의 encoder 전용 컴파일은 필요하지 않습니다. 다음 컴파일 단계를 참고하세요.

4. 실행

이 섹션은 전체 설정 과정을 단계별로 안내합니다. 다음 순서로 진행하세요. 포트와 디바이스를 설정한 뒤 모델을 한 번 컴파일하고, PD → encoder → proxy 순으로 실행한 다음 요청을 전송합니다.

4.1 포트 및 디바이스 설정

각 인스턴스는 OpenAI 호환 서빙을 위한 자체 HTTP 포트를 노출하며, encoder ↔ PD 사이의 통신에는 한 개의 추가 소켓을 사용합니다. 환경에서 사용 가능한 임의의 포트를 지정할 수 있지만, 다음 값들은 컴포넌트 간에 반드시 일치시켜야 합니다.

컴포넌트 설정 항목 일치 대상
Proxy --port client가 요청을 보내는 URL
PD --port proxy--decode-servers-urls 항목
Encoder --port proxy--encode-servers-urls 항목
PD pull_host / pull_port
(ec-transfer-config)
모든 encoder의 llm_host / llm_pull_port

이 가이드 전반의 예시 명령은 다음 값을 사용합니다. 환경에서 사용 가능한 포트로 교체하세요.

컴포넌트 HTTP 포트 Connector 포트 NPU 디바이스
Proxy 19000 n/a
PD 19100 19500 0,1,2,3,4,5,6,7
Encoder 0 19101 19500 8
Encoder 1 19102 19500 9
Encoder 2 19103 19500 10
... ... ... ...
Encoder 7 19108 19500 15

Connector 포트는 PD(pull_port로 바인드)와 모든 encoder(llm_pull_port로 접속) 모두 동일한 값입니다.

4.2 모델 컴파일

이 가이드에서는 Qwen3-VL-8B-Instruct를 예시로 사용합니다. optimum-rbln으로 한 번 컴파일한 결과를 encoder와 PD 인스턴스 양쪽에서 그대로 재사용합니다. 아래의 컴파일 설정은 이 가이드 뒷부분의 토폴로지에 맞춰져 있습니다. PD 측에서는 언어 모델이 8 NPU(tensor_parallel_size=8)에서 실행되고, encoder 측에서는 visual encoder가 1 NPU(visual.tensor_parallel_size=1)에서 실행됩니다.

from optimum.rbln import RBLNQwen3VLForConditionalGeneration

model_id = "Qwen/Qwen3-VL-8B-Instruct"
model = RBLNQwen3VLForConditionalGeneration.from_pretrained(
    model_id,
    export=True,
    rbln_config={
        "visual": {
            "max_seq_lens": 5440,
            "tensor_parallel_size": 1,
        },
        "tensor_parallel_size": 8,
        "max_seq_len": 32_768,
        "batch_size": 8,
        "decoder_batch_sizes": [8],
    },
)
model.save_pretrained("Qwen3-VL-8B-Instruct")

이후 단계의 명령들은 컴파일된 디렉토리 이름이 Qwen3-VL-8B-Instruct이고, vllm serve를 실행하는 작업 디렉토리에서 접근 가능하다고 가정합니다.

4.3 PD 인스턴스 실행

PD 인스턴스는 표준 vllm serve 프로세스에 --ec-transfer-config를 추가한 프로세스입니다. 이 설정으로 RblnECNixlConnectorec_consumer 역할로 선택합니다. PD는 pull_host / pull_port에서 encoder의 접속을 대기합니다.

RBLN_DEVICES=0,1,2,3,4,5,6,7 \
vllm serve Qwen3-VL-8B-Instruct \
    --port 19100 \
    --mm-processor-kwargs '{"max_pixels": 802816}' \
    --ec-transfer-config '{
        "ec_connector": "RblnECNixlConnector",
        "ec_role": "ec_consumer",
        "ec_connector_extra_config": {
            "pull_host": "0.0.0.0",
            "pull_port": 19500
        }
    }'
전체 launcher 스크립트: serve_ec_llm.sh
#!/usr/bin/env bash
# Start the EC disaggregation PD instance using RblnECNixlConnector.
#
# Usage:
#   bash serve_ec_llm.sh
#
# The PD listens for encoder metadata. Start this BEFORE the encoders.

MODEL_ID="${1:-Qwen3-VL-8B-Instruct}"
PORT="${2:-19100}"

# Connector bind address (encoders connect here)
PULL_HOST="${PULL_HOST:-0.0.0.0}"
PULL_PORT="${PULL_PORT:-19500}"

# PD devices
LLM_DEVICES="${LLM_DEVICES:-0,1,2,3,4,5,6,7}"

export RBLN_DEVICES=$LLM_DEVICES
exec vllm serve "$MODEL_ID" \
    --port "$PORT" \
    --mm-processor-kwargs '{"max_pixels": 802816}' \
    --ec-transfer-config "{
        \"ec_connector\": \"RblnECNixlConnector\",
        \"ec_role\": \"ec_consumer\",
        \"ec_connector_extra_config\": {
            \"pull_host\": \"$PULL_HOST\",
            \"pull_port\": $PULL_PORT
        }
    }"

encoder를 실행하기 전에 PD가 정상 상태가 될 때까지 대기하세요.

until curl -sf http://127.0.0.1:19100/health > /dev/null; do sleep 2; done

4.4 encoder 인스턴스 실행

각 encoder는 별도의 vllm serve 프로세스로, ec_role="ec_producer"로 설정하고 PD의 pull 엔드포인트를 llm_host / llm_pull_port에 지정합니다. RBLN_DEVICES로 encoder마다 1 NPU를 할당합니다.

다음 예시는 디바이스 8..15에서 8개의 encoder를 실행하며, 각각 포트 19101..19108에서 동작합니다.

for i in $(seq 0 7); do
    RBLN_DEVICES=$((8 + i)) vllm serve Qwen3-VL-8B-Instruct \
        --port $((19101 + i)) \
        --mm-processor-kwargs '{"max_pixels": 802816}' \
        --ec-transfer-config '{
            "ec_connector": "RblnECNixlConnector",
            "ec_role": "ec_producer",
            "ec_connector_extra_config": {
                "llm_host": "127.0.0.1",
                "llm_pull_port": 19500
            }
        }' &
done
전체 launcher 스크립트: serve_ec_encoder.sh
#!/usr/bin/env bash
# Start EC disaggregation encoder(s) using RblnECNixlConnector.
#
# Usage:
#   NUM_ENCODERS=8 bash serve_ec_encoder.sh
#
# Each encoder gets 1 device and connects to the PD's pull port.
# Start the PD FIRST with: bash serve_ec_llm.sh

MODEL_ID="${1:-Qwen3-VL-8B-Instruct}"
BASE_PORT="${2:-19101}"
BASE_DEVICE="${BASE_DEVICE:-8}"
NUM_ENCODERS="${NUM_ENCODERS:-8}"

# PD pull endpoint (where encoders connect)
LLM_HOST="${LLM_HOST:-127.0.0.1}"
LLM_PULL_PORT="${LLM_PULL_PORT:-19500}"

# Device list: either explicit or auto-generated
if [ -n "$RBLN_DEVICE_LIST" ]; then
    IFS=',' read -ra DEVICES <<< "$RBLN_DEVICE_LIST"
    NUM_ENCODERS="${#DEVICES[@]}"
else
    DEVICES=()
    for i in $(seq 0 $((NUM_ENCODERS - 1))); do
        DEVICES+=($((BASE_DEVICE + i)))
    done
fi

launch_encoder() {
    local idx=$1
    local device=${DEVICES[$idx]}
    local port=$((BASE_PORT + idx))

    echo "Starting encoder $idx (device=$device, port=$port, push→$LLM_HOST:$LLM_PULL_PORT)"
    RBLN_DEVICES=$device vllm serve "$MODEL_ID" \
        --port "$port" \
        --mm-processor-kwargs '{"max_pixels": 802816}' \
        --ec-transfer-config "{
            \"ec_connector\": \"RblnECNixlConnector\",
            \"ec_role\": \"ec_producer\",
            \"ec_connector_extra_config\": {
                \"llm_host\": \"$LLM_HOST\",
                \"llm_pull_port\": $LLM_PULL_PORT
            }
        }" &
}

for i in $(seq 0 $((NUM_ENCODERS - 1))); do
    launch_encoder "$i"
done

echo "Launched $NUM_ENCODERS encoder(s). Press Ctrl+C to stop all."
trap 'kill $(jobs -p) 2>/dev/null; wait' INT TERM
wait

4.5 proxy 실행

Proxy는 OpenAI 호환 프런트엔드로, 멀티모달 아이템을 encoder 클러스터로 분배하고 원본 요청을 PD로 전달합니다. /v1/chat/completions, /v1/models, /health를 노출합니다. 전체 소스는 vllm-rbln 저장소에 있습니다.

1
2
3
4
5
python client_ec_disaggregated.py \
    --host 0.0.0.0 \
    --port 19000 \
    --encode-servers-urls "http://127.0.0.1:19101,http://127.0.0.1:19102,http://127.0.0.1:19103,http://127.0.0.1:19104,http://127.0.0.1:19105,http://127.0.0.1:19106,http://127.0.0.1:19107,http://127.0.0.1:19108" \
    --decode-servers-urls "http://127.0.0.1:19100"
전체 proxy 스크립트: client_ec_disaggregated.py
#!/usr/bin/env python3
"""
EC Disaggregated Encoder Proxy

Routes OpenAI-compatible "/v1/chat/completions" requests to two clusters
for EC (Encoder Cache) disaggregation:

  * encode  (encoder — runs the visual encoder, stages encoder cache)
  * decode  (PD — receives encoder cache, runs prefill + decode)

For multimodal input we:
    1. Extract every image/audio item from the request.
    2. Fire N concurrent requests to the encoder cluster (one per MM
       item, with all text removed).
    3. Wait for all of them to succeed (encoder caches are now staged
       on the PD side).
    4. Forward the original request to a decode (PD) server.
"""
from __future__ import annotations

import argparse
import asyncio
import logging
import random
import uuid
from collections.abc import AsyncIterator

import aiohttp
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse

logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger("ec_proxy")

app = FastAPI()
encode_session: aiohttp.ClientSession | None = None
decode_session: aiohttp.ClientSession | None = None

# Round-robin counter so concurrent single-MM-item requests still
# spread across encoders.
_encode_rr_counter: int = 0

MM_TYPES = {"image_url", "audio_url", "input_audio"}


def extract_mm_items(request_data: dict) -> list[dict]:
    items: list[dict] = []
    for msg in request_data.get("messages", []):
        content = msg.get("content")
        if not isinstance(content, list):
            continue
        for item in content:
            if item.get("type") in MM_TYPES:
                items.append(item)
    return items


async def fanout_encoder_primer(
    orig_request: dict,
    e_urls: list[str],
    req_id: str,
) -> None:
    mm_items = extract_mm_items(orig_request)
    if not mm_items:
        return

    global _encode_rr_counter
    start = _encode_rr_counter
    _encode_rr_counter = (start + len(mm_items)) % max(len(e_urls), 1)
    url_cycle = (e_urls[(start + i) % len(e_urls)] for i in range(len(mm_items)))

    tasks = []
    for idx, (item, target_url) in enumerate(zip(mm_items, url_cycle)):
        child_req_id = f"{req_id}:{idx}:{uuid.uuid4().hex[:6]}"
        headers = {"x-request-id": child_req_id}
        encoder_req = {
            "model": orig_request.get("model"),
            "messages": [{"role": "user", "content": [item]}],
            "max_tokens": 1,
            "stream": False,
        }
        tasks.append(
            encode_session.post(
                f"{target_url}/v1/chat/completions",
                json=encoder_req,
                headers=headers,
            )
        )

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for idx, r in enumerate(results):
        if isinstance(r, Exception):
            raise HTTPException(
                status_code=502, detail=f"Encoder request failed: {str(r)}"
            )
        if r.status != 200:
            try:
                detail = await r.text()
            except Exception:
                detail = "<unable to read body>"
            raise HTTPException(
                status_code=r.status,
                detail=f"Encoder request failed: {detail}",
            )


@app.on_event("startup")
async def on_startup() -> None:
    global encode_session, decode_session
    timeout = aiohttp.ClientTimeout(total=100_000)
    enc_connector = aiohttp.TCPConnector(limit=0, force_close=True)
    dec_connector = aiohttp.TCPConnector(limit=0, force_close=True)
    encode_session = aiohttp.ClientSession(timeout=timeout, connector=enc_connector)
    decode_session = aiohttp.ClientSession(timeout=timeout, connector=dec_connector)


@app.on_event("shutdown")
async def on_shutdown() -> None:
    if encode_session:
        await encode_session.close()
    if decode_session:
        await decode_session.close()


async def forward_non_stream(
    req_data: dict, req_id: str, e_urls: list[str], d_url: str
) -> dict:
    await fanout_encoder_primer(req_data, e_urls, req_id)
    headers = {"x-request-id": req_id}
    async with decode_session.post(
        f"{d_url}/v1/chat/completions", json=req_data, headers=headers
    ) as resp:
        resp.raise_for_status()
        return await resp.json()


async def forward_stream(
    req_data: dict, req_id: str, e_urls: list[str], d_url: str
) -> AsyncIterator[str]:
    await fanout_encoder_primer(req_data, e_urls, req_id)
    headers = {"x-request-id": req_id}
    async with decode_session.post(
        f"{d_url}/v1/chat/completions", json=req_data, headers=headers,
    ) as resp:
        resp.raise_for_status()
        async for chunk in resp.content.iter_chunked(1024):
            if chunk:
                yield chunk.decode("utf-8", errors="ignore")


@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    req_data = await request.json()
    req_id = request.headers.get("x-request-id", str(uuid.uuid4()))
    e_urls = app.state.e_urls
    d_url = random.choice(app.state.d_urls)
    if req_data.get("stream", False):
        return StreamingResponse(
            forward_stream(req_data, req_id, e_urls, d_url),
            media_type="text/event-stream",
        )
    result = await forward_non_stream(req_data, req_id, e_urls, d_url)
    return JSONResponse(content=result)


@app.get("/v1/models")
async def list_models():
    async with decode_session.get(f"{app.state.d_urls[0]}/v1/models") as resp:
        resp.raise_for_status()
        return await resp.json()


@app.get("/health")
async def health_check():
    async def healthy(urls):
        if not urls:
            return "empty"
        for u in urls:
            try:
                async with encode_session.get(f"{u}/health") as resp:
                    resp.raise_for_status()
            except Exception:
                return "unhealthy"
        return "healthy"

    e_status, d_status = await asyncio.gather(
        healthy(app.state.e_urls),
        healthy(app.state.d_urls),
    )
    overall_healthy = all(s != "unhealthy" for s in (e_status, d_status))
    return JSONResponse(
        {
            "proxy": "healthy",
            "encode_cluster": e_status,
            "decode_cluster": d_status,
        },
        status_code=200 if overall_healthy else 503,
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="EC Disaggregated Encoder Proxy (E+PD mode)"
    )
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=19000)
    parser.add_argument(
        "--encode-servers-urls",
        required=True,
        help='Comma-separated encoder URLs ("http://127.0.0.1:19101,...")',
    )
    parser.add_argument(
        "--decode-servers-urls",
        required=True,
        help='Comma-separated decode (PD) URLs ("http://127.0.0.1:19100")',
    )
    args = parser.parse_args()
    app.state.e_urls = [u.strip() for u in args.encode_servers_urls.split(",") if u.strip()]
    app.state.d_urls = [u.strip() for u in args.decode_servers_urls.split(",") if u.strip()]
    uvicorn.run(app, host=args.host, port=args.port, log_level="info", loop="uvloop")

4.6 요청 전송

세 컴포넌트가 모두 준비되면 표준 OpenAI 호환 chat completions 요청으로 proxy를 호출할 수 있습니다. 단일 요청에 여러 멀티모달 아이템이 포함된 경우, proxy는 content 배열의 모든 image_url / audio_url을 자동으로 추출해 encoder 클러스터로 병렬로 분배(round-robin) 한 뒤, 원본 요청을 PD로 전달합니다.

다음 예시는 이미지 8개를 포함한 요청 한 번으로 8개의 encoder 인스턴스를 동시에 활용합니다.

curl -sS http://127.0.0.1:19000/v1/chat/completions \
    -H "Content-Type: application/json" \
    -d '{
        "model": "Qwen3-VL-8B-Instruct",
        "messages": [{
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000039769.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000397133.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000252219.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000087038.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000174482.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000403385.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000037777.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000000139.jpg"}},
                {"type": "text", "text": "Briefly describe each of these images."}
            ]
        }],
        "max_tokens": 256
    }'

응답은 표준 OpenAI chat completion JSON이며, assistant 메시지에 8개 이미지 각각에 대한 설명이 순차적으로 포함됩니다.

5. 개선 방안

5.1 PD 컴파일에 bucketing 적용

이 가이드의 컴파일 설정은 PD 측에 단일 decoder shape(decoder_batch_sizes=[8])만 생성합니다. 동시에 처리 중인 요청이 8개 미만일 때도 디코드 단계는 batch-8 그래프의 비용을 그대로 지불하게 됩니다. bucketing을 적용하면 rebel-compiler가 여러 디코드 셰이프를 하나의 컴파일 결과에 묶고, 런타임이 현재 활성 batch에 가장 잘 맞는 작은 bucket을 선택합니다. 이를 통해 tensor_parallel_size를 변경하지 않고도 저·중 부하에서 throughput을 향상시킬 수 있습니다.

시작점으로 decoder bucket 몇 단계를 추가하는 방식이 일반적입니다.

"decoder_batch_sizes": [1, 2, 4, 8],

PD는 동일한 8 NPU에서 tensor_parallel_size=8로 계속 실행되며, decoder 디스패치만 step마다 bucket을 선택합니다. 동작 메커니즘과 트레이드오프(bucket이 많아질수록 컴파일 시간이 늘어나고 컴파일 결과 크기가 커짐)는 버케팅 튜토리얼을 참고하세요.

6. 제약사항

  • Qwen3-VL 계열 모델만 지원합니다.
  • N encoder + 1 PD 토폴로지만 지원됩니다.
  • PD는 encoder가 접속할 connector 소켓을 바인드하므로, 반드시 encoder보다 먼저 실행해야 합니다.
  • 모든 encoder와 PD 인스턴스는 설정한 llm_host / llm_pull_port로 서로 도달 가능해야 합니다.
  • 본 기능은 실험적입니다. 플래그명, connector 설정, proxy 인터페이스는 변경될 수 있습니다.