Skip to content

Disaggregated Encoder

Disaggregated Encoder beta status

Disaggregated Encoder is currently in beta and under active development. Configuration knobs, connector behavior, and the proxy interface may change between releases, and backward compatibility is not guaranteed. Use in production workloads is not recommended yet. Feedback is welcome to help stabilize the feature toward general availability.

1. Overview

For multimodal models, the visual encoder and the language model have different compute and scheduling characteristics. The encoder runs in short, bursty steps and underutilizes the NPUs that the language model would otherwise use for prefill and decode.

Disaggregated Encoder (EC disaggregation) separates these stages into independent vLLM processes:

  • Encoder instances run only the visual encoder.
  • A PD (Prefill+Decode) instance runs the language model.
  • Encoded multimodal features are transferred from encoders to the PD by the EC connector.
  • A lightweight proxy in front fans multimodal items out to encoders and forwards the original request to the PD.

This allows the encoder to scale horizontally (multiple instances on a few NPUs each) while keeping the LLM stage on a dedicated set of NPUs.

1.1 Current support

  • Topology: N encoders + 1 PD (combined prefill + decode).
  • Models: Qwen3-VL family only.
  • Connector: RblnECNixlConnector handles the encoder-cache transfer between encoders and the PD.

2. Architecture

Disaggregated encoder request flow

Request flow:

  1. The proxy extracts every image / audio item from the incoming chat request.
  2. It sends one request per multimodal item to the encoder cluster (round-robin), with text removed. The encoders compute visual features and stage them on the PD via the connector.
  3. The proxy forwards the original request to the PD. The PD pulls the encoder cache and runs prefill and decode.

3. Prerequisites

  • System requirements:
  • Required packages:
    • RBLN Compiler
    • optimum-rbln (with Qwen3-VL support)
    • vllm-rbln v0.10.3 or newer (the version that introduced RblnECNixlConnector).
    • nixl — used by RblnECNixlConnector for the encoder-cache transfer. Install with pip install nixl.
  • NPUs: 1 NPU per encoder instance + 8 NPUs for the PD instance. Example below uses 8 encoders + 1 PD = 16 NPUs total.
  • Compiled model: Qwen3-VL compile result is reused as-is on both encoder and PD instances; no separate encoder-only compilation is required. See the compilation step below.

4. Execution

This section describes the end-to-end setup. Perform the steps in order: configure the ports and devices, compile the model once, start the PD followed by the encoders and the proxy, then send a request.

4.1 Port and device configuration

Each instance exposes its own HTTP port for OpenAI-compatible serving, and the encoder ↔ PD link uses one additional socket. Use any free ports in your environment; the values listed below must be kept in sync across components.

Component Setting Must match
Proxy --port URL the client sends requests to
PD --port each entry of --decode-servers-urls on the proxy
Encoder --port each entry of --encode-servers-urls on the proxy
PD pull_host / pull_port
(ec-transfer-config)
every encoder's llm_host / llm_pull_port

The example commands throughout this guide use the following values; replace them with values available in your environment:

Component HTTP port Connector port NPU device(s)
Proxy 19000 n/a
PD 19100 19500 0,1,2,3,4,5,6,7
Encoder 0 19101 19500 8
Encoder 1 19102 19500 9
Encoder 2 19103 19500 10
... ... ... ...
Encoder 7 19108 19500 15

The connector port is the same value across the PD (pull_port, where it binds) and every encoder (llm_pull_port, where it connects).

4.2 Compile the model

This guide uses Qwen3-VL-8B-Instruct as the running example. Compile it once with optimum-rbln and reuse the resulting directory on both encoder and PD instances. The compile config below matches the topology used later in this guide: the language model runs on 8 NPUs (tensor_parallel_size=8) on the PD side, and the visual encoder runs on a single NPU (visual.tensor_parallel_size=1) on each encoder side.

from optimum.rbln import RBLNQwen3VLForConditionalGeneration

model_id = "Qwen/Qwen3-VL-8B-Instruct"
model = RBLNQwen3VLForConditionalGeneration.from_pretrained(
    model_id,
    export=True,
    rbln_config={
        "visual": {
            "max_seq_lens": 5440,
            "tensor_parallel_size": 1,
        },
        "tensor_parallel_size": 8,
        "max_seq_len": 32_768,
        "batch_size": 8,
        "decoder_batch_sizes": [8],
    },
)
model.save_pretrained("Qwen3-VL-8B-Instruct")

The commands in the following sections assume the compiled directory is named Qwen3-VL-8B-Instruct and is reachable from the working directory where vllm serve is invoked.

4.3 Launch the PD instance

The PD instance is a regular vllm serve process with an --ec-transfer-config that selects RblnECNixlConnector in ec_consumer role. It listens for encoders on pull_host / pull_port.

RBLN_DEVICES=0,1,2,3,4,5,6,7 \
vllm serve Qwen3-VL-8B-Instruct \
    --port 19100 \
    --mm-processor-kwargs '{"max_pixels": 802816}' \
    --ec-transfer-config '{
        "ec_connector": "RblnECNixlConnector",
        "ec_role": "ec_consumer",
        "ec_connector_extra_config": {
            "pull_host": "0.0.0.0",
            "pull_port": 19500
        }
    }'
Full launcher script: serve_ec_llm.sh
#!/usr/bin/env bash
# Start the EC disaggregation PD instance using RblnECNixlConnector.
#
# Usage:
#   bash serve_ec_llm.sh
#
# The PD listens for encoder metadata. Start this BEFORE the encoders.

MODEL_ID="${1:-Qwen3-VL-8B-Instruct}"
PORT="${2:-19100}"

# Connector bind address (encoders connect here)
PULL_HOST="${PULL_HOST:-0.0.0.0}"
PULL_PORT="${PULL_PORT:-19500}"

# PD devices
LLM_DEVICES="${LLM_DEVICES:-0,1,2,3,4,5,6,7}"

export RBLN_DEVICES=$LLM_DEVICES
exec vllm serve "$MODEL_ID" \
    --port "$PORT" \
    --mm-processor-kwargs '{"max_pixels": 802816}' \
    --ec-transfer-config "{
        \"ec_connector\": \"RblnECNixlConnector\",
        \"ec_role\": \"ec_consumer\",
        \"ec_connector_extra_config\": {
            \"pull_host\": \"$PULL_HOST\",
            \"pull_port\": $PULL_PORT
        }
    }"

Wait until the PD is healthy before launching encoders:

until curl -sf http://127.0.0.1:19100/health > /dev/null; do sleep 2; done

4.4 Launch encoder instances

Each encoder is another vllm serve process with ec_role="ec_producer" and the PD's pull endpoint set as llm_host / llm_pull_port. Give each encoder its own NPU via RBLN_DEVICES.

The example below launches eight encoders on devices 8..15, listening on ports 19101..19108:

for i in $(seq 0 7); do
    RBLN_DEVICES=$((8 + i)) vllm serve Qwen3-VL-8B-Instruct \
        --port $((19101 + i)) \
        --mm-processor-kwargs '{"max_pixels": 802816}' \
        --ec-transfer-config '{
            "ec_connector": "RblnECNixlConnector",
            "ec_role": "ec_producer",
            "ec_connector_extra_config": {
                "llm_host": "127.0.0.1",
                "llm_pull_port": 19500
            }
        }' &
done
Full launcher script: serve_ec_encoder.sh
#!/usr/bin/env bash
# Start EC disaggregation encoder(s) using RblnECNixlConnector.
#
# Usage:
#   NUM_ENCODERS=8 bash serve_ec_encoder.sh
#
# Each encoder gets 1 device and connects to the PD's pull port.
# Start the PD FIRST with: bash serve_ec_llm.sh

MODEL_ID="${1:-Qwen3-VL-8B-Instruct}"
BASE_PORT="${2:-19101}"
BASE_DEVICE="${BASE_DEVICE:-8}"
NUM_ENCODERS="${NUM_ENCODERS:-8}"

# PD pull endpoint (where encoders connect)
LLM_HOST="${LLM_HOST:-127.0.0.1}"
LLM_PULL_PORT="${LLM_PULL_PORT:-19500}"

# Device list: either explicit or auto-generated
if [ -n "$RBLN_DEVICE_LIST" ]; then
    IFS=',' read -ra DEVICES <<< "$RBLN_DEVICE_LIST"
    NUM_ENCODERS="${#DEVICES[@]}"
else
    DEVICES=()
    for i in $(seq 0 $((NUM_ENCODERS - 1))); do
        DEVICES+=($((BASE_DEVICE + i)))
    done
fi

launch_encoder() {
    local idx=$1
    local device=${DEVICES[$idx]}
    local port=$((BASE_PORT + idx))

    echo "Starting encoder $idx (device=$device, port=$port, push→$LLM_HOST:$LLM_PULL_PORT)"
    RBLN_DEVICES=$device vllm serve "$MODEL_ID" \
        --port "$port" \
        --mm-processor-kwargs '{"max_pixels": 802816}' \
        --ec-transfer-config "{
            \"ec_connector\": \"RblnECNixlConnector\",
            \"ec_role\": \"ec_producer\",
            \"ec_connector_extra_config\": {
                \"llm_host\": \"$LLM_HOST\",
                \"llm_pull_port\": $LLM_PULL_PORT
            }
        }" &
}

for i in $(seq 0 $((NUM_ENCODERS - 1))); do
    launch_encoder "$i"
done

echo "Launched $NUM_ENCODERS encoder(s). Press Ctrl+C to stop all."
trap 'kill $(jobs -p) 2>/dev/null; wait' INT TERM
wait

4.5 Launch the proxy

The proxy is an OpenAI-compatible front end that fans multimodal items out to the encoder cluster and forwards the original request to the PD. It exposes /v1/chat/completions, /v1/models, and /health. The full source is in the vllm-rbln repository.

1
2
3
4
5
python client_ec_disaggregated.py \
    --host 0.0.0.0 \
    --port 19000 \
    --encode-servers-urls "http://127.0.0.1:19101,http://127.0.0.1:19102,http://127.0.0.1:19103,http://127.0.0.1:19104,http://127.0.0.1:19105,http://127.0.0.1:19106,http://127.0.0.1:19107,http://127.0.0.1:19108" \
    --decode-servers-urls "http://127.0.0.1:19100"
Full proxy script: client_ec_disaggregated.py
#!/usr/bin/env python3
"""
EC Disaggregated Encoder Proxy

Routes OpenAI-compatible "/v1/chat/completions" requests to two clusters
for EC (Encoder Cache) disaggregation:

  * encode  (encoder — runs the visual encoder, stages encoder cache)
  * decode  (PD — receives encoder cache, runs prefill + decode)

For multimodal input we:
    1. Extract every image/audio item from the request.
    2. Fire N concurrent requests to the encoder cluster (one per MM
       item, with all text removed).
    3. Wait for all of them to succeed (encoder caches are now staged
       on the PD side).
    4. Forward the original request to a decode (PD) server.
"""
from __future__ import annotations

import argparse
import asyncio
import logging
import random
import uuid
from collections.abc import AsyncIterator

import aiohttp
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse

logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger("ec_proxy")

app = FastAPI()
encode_session: aiohttp.ClientSession | None = None
decode_session: aiohttp.ClientSession | None = None

# Round-robin counter so concurrent single-MM-item requests still
# spread across encoders.
_encode_rr_counter: int = 0

MM_TYPES = {"image_url", "audio_url", "input_audio"}


def extract_mm_items(request_data: dict) -> list[dict]:
    items: list[dict] = []
    for msg in request_data.get("messages", []):
        content = msg.get("content")
        if not isinstance(content, list):
            continue
        for item in content:
            if item.get("type") in MM_TYPES:
                items.append(item)
    return items


async def fanout_encoder_primer(
    orig_request: dict,
    e_urls: list[str],
    req_id: str,
) -> None:
    mm_items = extract_mm_items(orig_request)
    if not mm_items:
        return

    global _encode_rr_counter
    start = _encode_rr_counter
    _encode_rr_counter = (start + len(mm_items)) % max(len(e_urls), 1)
    url_cycle = (e_urls[(start + i) % len(e_urls)] for i in range(len(mm_items)))

    tasks = []
    for idx, (item, target_url) in enumerate(zip(mm_items, url_cycle)):
        child_req_id = f"{req_id}:{idx}:{uuid.uuid4().hex[:6]}"
        headers = {"x-request-id": child_req_id}
        encoder_req = {
            "model": orig_request.get("model"),
            "messages": [{"role": "user", "content": [item]}],
            "max_tokens": 1,
            "stream": False,
        }
        tasks.append(
            encode_session.post(
                f"{target_url}/v1/chat/completions",
                json=encoder_req,
                headers=headers,
            )
        )

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for idx, r in enumerate(results):
        if isinstance(r, Exception):
            raise HTTPException(
                status_code=502, detail=f"Encoder request failed: {str(r)}"
            )
        if r.status != 200:
            try:
                detail = await r.text()
            except Exception:
                detail = "<unable to read body>"
            raise HTTPException(
                status_code=r.status,
                detail=f"Encoder request failed: {detail}",
            )


@app.on_event("startup")
async def on_startup() -> None:
    global encode_session, decode_session
    timeout = aiohttp.ClientTimeout(total=100_000)
    enc_connector = aiohttp.TCPConnector(limit=0, force_close=True)
    dec_connector = aiohttp.TCPConnector(limit=0, force_close=True)
    encode_session = aiohttp.ClientSession(timeout=timeout, connector=enc_connector)
    decode_session = aiohttp.ClientSession(timeout=timeout, connector=dec_connector)


@app.on_event("shutdown")
async def on_shutdown() -> None:
    if encode_session:
        await encode_session.close()
    if decode_session:
        await decode_session.close()


async def forward_non_stream(
    req_data: dict, req_id: str, e_urls: list[str], d_url: str
) -> dict:
    await fanout_encoder_primer(req_data, e_urls, req_id)
    headers = {"x-request-id": req_id}
    async with decode_session.post(
        f"{d_url}/v1/chat/completions", json=req_data, headers=headers
    ) as resp:
        resp.raise_for_status()
        return await resp.json()


async def forward_stream(
    req_data: dict, req_id: str, e_urls: list[str], d_url: str
) -> AsyncIterator[str]:
    await fanout_encoder_primer(req_data, e_urls, req_id)
    headers = {"x-request-id": req_id}
    async with decode_session.post(
        f"{d_url}/v1/chat/completions", json=req_data, headers=headers,
    ) as resp:
        resp.raise_for_status()
        async for chunk in resp.content.iter_chunked(1024):
            if chunk:
                yield chunk.decode("utf-8", errors="ignore")


@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    req_data = await request.json()
    req_id = request.headers.get("x-request-id", str(uuid.uuid4()))
    e_urls = app.state.e_urls
    d_url = random.choice(app.state.d_urls)
    if req_data.get("stream", False):
        return StreamingResponse(
            forward_stream(req_data, req_id, e_urls, d_url),
            media_type="text/event-stream",
        )
    result = await forward_non_stream(req_data, req_id, e_urls, d_url)
    return JSONResponse(content=result)


@app.get("/v1/models")
async def list_models():
    async with decode_session.get(f"{app.state.d_urls[0]}/v1/models") as resp:
        resp.raise_for_status()
        return await resp.json()


@app.get("/health")
async def health_check():
    async def healthy(urls):
        if not urls:
            return "empty"
        for u in urls:
            try:
                async with encode_session.get(f"{u}/health") as resp:
                    resp.raise_for_status()
            except Exception:
                return "unhealthy"
        return "healthy"

    e_status, d_status = await asyncio.gather(
        healthy(app.state.e_urls),
        healthy(app.state.d_urls),
    )
    overall_healthy = all(s != "unhealthy" for s in (e_status, d_status))
    return JSONResponse(
        {
            "proxy": "healthy",
            "encode_cluster": e_status,
            "decode_cluster": d_status,
        },
        status_code=200 if overall_healthy else 503,
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="EC Disaggregated Encoder Proxy (E+PD mode)"
    )
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", type=int, default=19000)
    parser.add_argument(
        "--encode-servers-urls",
        required=True,
        help='Comma-separated encoder URLs ("http://127.0.0.1:19101,...")',
    )
    parser.add_argument(
        "--decode-servers-urls",
        required=True,
        help='Comma-separated decode (PD) URLs ("http://127.0.0.1:19100")',
    )
    args = parser.parse_args()
    app.state.e_urls = [u.strip() for u in args.encode_servers_urls.split(",") if u.strip()]
    app.state.d_urls = [u.strip() for u in args.decode_servers_urls.split(",") if u.strip()]
    uvicorn.run(app, host=args.host, port=args.port, log_level="info", loop="uvloop")

4.6 Sending requests

Once all three components are up, send a standard OpenAI-compatible chat completions request to the proxy. When a single request carries multiple multimodal items, the proxy automatically extracts every image_url / audio_url from the content array and fans them out to the encoder cluster in parallel (round-robin across encoders), then forwards the original request to the PD.

The example below sends one request with 8 images, exercising all eight encoder instances at once:

curl -sS http://127.0.0.1:19000/v1/chat/completions \
    -H "Content-Type: application/json" \
    -d '{
        "model": "Qwen3-VL-8B-Instruct",
        "messages": [{
            "role": "user",
            "content": [
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000039769.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000397133.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000252219.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000087038.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000174482.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000403385.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000037777.jpg"}},
                {"type": "image_url", "image_url": {"url": "http://images.cocodataset.org/val2017/000000000139.jpg"}},
                {"type": "text", "text": "Briefly describe each of these images."}
            ]
        }],
        "max_tokens": 256
    }'

The response is a standard OpenAI chat completion JSON; the assistant message describes all eight images in sequence.

5. Improvements

5.1 Apply bucketing to the PD compile

The compile config in this guide produces a single decoder shape (decoder_batch_sizes=[8]) on the PD. Whenever fewer than eight requests are in flight, decode steps still pay the cost of the full batch-8 graph. With bucketing, the rebel-compiler can pack several decode shapes into one compiled artifact and the runtime selects the smallest bucket that fits the live batch, increasing throughput at low and medium load without changing tensor_parallel_size.

A typical starting point is to add a few decoder buckets:

"decoder_batch_sizes": [1, 2, 4, 8],

The PD continues to run tensor_parallel_size=8 across the same 8 NPUs; only the decoder dispatch selects a bucket per step. See the Bucketing tutorial for the underlying mechanism and trade-offs (more buckets mean longer compile time and a larger compiled artifact).

6. Limitations

  • Only Qwen3-VL family models are supported.
  • Only the N encoders + 1 PD topology is supported.
  • The PD must be started before the encoders, because it binds the connector socket that encoders connect to.
  • All encoder and PD instances must be reachable on the configured llm_host / llm_pull_port.
  • This feature is experimental — flag names, connector configuration, and the proxy interface may change.