Skip to content

vllm.entrypoints.openai.run_batch

BatchRequestInput

Bases: OpenAIBaseModel

The per-line object of the batch input file.

NOTE: Currently only the /v1/chat/completions endpoint is supported.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchRequestInput(OpenAIBaseModel):
    """
    The per-line object of the batch input file.

    NOTE: Currently only the `/v1/chat/completions` endpoint is supported.
    """

    # A developer-provided per-request id that will be used to match outputs to
    # inputs. Must be unique for each request in a batch.
    custom_id: str

    # The HTTP method to be used for the request. Currently only POST is
    # supported.
    method: str

    # The OpenAI API relative URL to be used for the request. Currently
    # /v1/chat/completions is supported.
    url: str

    # The parameters of the request.
    body: BatchRequestInputBody

    @field_validator("body", mode="plain")
    @classmethod
    def check_type_for_url(cls, value: Any, info: ValidationInfo):
        # Use url to disambiguate models
        url: str = info.data["url"]
        if url == "/v1/chat/completions":
            return ChatCompletionRequest.model_validate(value)
        if url == "/v1/embeddings":
            return TypeAdapter(EmbeddingRequest).validate_python(value)
        if url.endswith("/score"):
            return TypeAdapter(ScoreRequest).validate_python(value)
        if url.endswith("/rerank"):
            return RerankRequest.model_validate(value)
        if url == "/v1/audio/transcriptions":
            return BatchTranscriptionRequest.model_validate(value)
        if url == "/v1/audio/translations":
            return BatchTranslationRequest.model_validate(value)
        return TypeAdapter(BatchRequestInputBody).validate_python(value)

BatchRequestOutput

Bases: OpenAIBaseModel

The per-line object of the batch output and error files

Source code in vllm/entrypoints/openai/run_batch.py
class BatchRequestOutput(OpenAIBaseModel):
    """
    The per-line object of the batch output and error files
    """

    id: str

    # A developer-provided per-request id that will be used to match outputs to
    # inputs.
    custom_id: str

    response: BatchResponseData | None

    # For requests that failed with a non-HTTP error, this will contain more
    # information on the cause of the failure.
    error: Any | None

BatchTranscriptionRequest

Bases: TranscriptionRequest

Batch transcription request that uses file_url instead of file.

This class extends TranscriptionRequest but replaces the file field with file_url to support batch processing from audio files written in JSON format.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchTranscriptionRequest(TranscriptionRequest):
    """
    Batch transcription request that uses file_url instead of file.

    This class extends TranscriptionRequest but replaces the file field
    with file_url to support batch processing from audio files written in JSON format.
    """

    file_url: str = Field(
        ...,
        description=(
            "Either a URL of the audio or a data URL with base64 encoded audio data. "
        ),
    )

    # Override file to be optional and unused for batch processing
    file: UploadFile | None = Field(default=None, exclude=True)  # type: ignore[assignment]

    @model_validator(mode="before")
    @classmethod
    def validate_no_file(cls, data: Any):
        """Ensure file field is not provided in batch requests."""
        if isinstance(data, dict) and "file" in data:
            raise ValueError(
                "The 'file' field is not supported in batch requests. "
                "Use 'file_url' instead."
            )
        return data

validate_no_file classmethod

validate_no_file(data: Any)

Ensure file field is not provided in batch requests.

Source code in vllm/entrypoints/openai/run_batch.py
@model_validator(mode="before")
@classmethod
def validate_no_file(cls, data: Any):
    """Ensure file field is not provided in batch requests."""
    if isinstance(data, dict) and "file" in data:
        raise ValueError(
            "The 'file' field is not supported in batch requests. "
            "Use 'file_url' instead."
        )
    return data

BatchTranslationRequest

Bases: TranslationRequest

Batch translation request that uses file_url instead of file.

This class extends TranslationRequest but replaces the file field with file_url to support batch processing from audio files written in JSON format.

Source code in vllm/entrypoints/openai/run_batch.py
class BatchTranslationRequest(TranslationRequest):
    """
    Batch translation request that uses file_url instead of file.

    This class extends TranslationRequest but replaces the file field
    with file_url to support batch processing from audio files written in JSON format.
    """

    file_url: str = Field(
        ...,
        description=(
            "Either a URL of the audio or a data URL with base64 encoded audio data. "
        ),
    )

    # Override file to be optional and unused for batch processing
    file: UploadFile | None = Field(default=None, exclude=True)  # type: ignore[assignment]

    @model_validator(mode="before")
    @classmethod
    def validate_no_file(cls, data: Any):
        """Ensure file field is not provided in batch requests."""
        if isinstance(data, dict) and "file" in data:
            raise ValueError(
                "The 'file' field is not supported in batch requests. "
                "Use 'file_url' instead."
            )
        return data

validate_no_file classmethod

validate_no_file(data: Any)

Ensure file field is not provided in batch requests.

Source code in vllm/entrypoints/openai/run_batch.py
@model_validator(mode="before")
@classmethod
def validate_no_file(cls, data: Any):
    """Ensure file field is not provided in batch requests."""
    if isinstance(data, dict) and "file" in data:
        raise ValueError(
            "The 'file' field is not supported in batch requests. "
            "Use 'file_url' instead."
        )
    return data

build_endpoint_registry

build_endpoint_registry(
    engine_client: EngineClient,
    args: Namespace,
    base_model_paths: list[BaseModelPath],
    request_logger: RequestLogger | None,
    supported_tasks: tuple[SupportedTask, ...],
) -> dict[str, dict[str, Any]]

Build the endpoint registry with all serving objects and handler configurations.

Parameters:

Name Type Description Default
engine_client EngineClient

The engine client

required
args Namespace

Command line arguments

required
base_model_paths list[BaseModelPath]

List of base model paths

required
request_logger RequestLogger | None

Optional request logger

required
supported_tasks tuple[SupportedTask, ...]

Tuple of supported tasks

required

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping endpoint keys to their configurations

Source code in vllm/entrypoints/openai/run_batch.py
def build_endpoint_registry(
    engine_client: EngineClient,
    args: Namespace,
    base_model_paths: list[BaseModelPath],
    request_logger: RequestLogger | None,
    supported_tasks: tuple[SupportedTask, ...],
) -> dict[str, dict[str, Any]]:
    """
    Build the endpoint registry with all serving objects and handler configurations.

    Args:
        engine_client: The engine client
        args: Command line arguments
        base_model_paths: List of base model paths
        request_logger: Optional request logger
        supported_tasks: Tuple of supported tasks

    Returns:
        Dictionary mapping endpoint keys to their configurations
    """
    model_config = engine_client.model_config

    # Create the openai serving objects.
    openai_serving_models = OpenAIServingModels(
        engine_client=engine_client,
        base_model_paths=base_model_paths,
        lora_modules=None,
    )

    openai_serving_chat = (
        OpenAIServingChat(
            engine_client,
            openai_serving_models,
            args.response_role,
            request_logger=request_logger,
            chat_template=None,
            chat_template_content_format="auto",
            reasoning_parser=args.structured_outputs_config.reasoning_parser,
            enable_prompt_tokens_details=args.enable_prompt_tokens_details,
            enable_force_include_usage=args.enable_force_include_usage,
            default_chat_template_kwargs=getattr(
                args, "default_chat_template_kwargs", None
            ),
        )
        if "generate" in supported_tasks
        else None
    )

    openai_serving_embedding = (
        OpenAIServingEmbedding(
            engine_client,
            openai_serving_models,
            request_logger=request_logger,
            chat_template=None,
            chat_template_content_format="auto",
        )
        if "embed" in supported_tasks
        else None
    )

    enable_serving_reranking = (
        "classify" in supported_tasks
        and getattr(model_config.hf_config, "num_labels", 0) == 1
    )

    openai_serving_scores = (
        ServingScores(
            engine_client,
            openai_serving_models,
            request_logger=request_logger,
            score_template=None,
        )
        if ("embed" in supported_tasks or enable_serving_reranking)
        else None
    )

    openai_serving_transcription = (
        OpenAIServingTranscription(
            engine_client,
            openai_serving_models,
            request_logger=request_logger,
            enable_force_include_usage=args.enable_force_include_usage,
        )
        if "transcription" in supported_tasks
        else None
    )

    openai_serving_translation = (
        OpenAIServingTranslation(
            engine_client,
            openai_serving_models,
            request_logger=request_logger,
            enable_force_include_usage=args.enable_force_include_usage,
        )
        if "transcription" in supported_tasks
        else None
    )

    # Registry of endpoint configurations
    endpoint_registry: dict[str, dict[str, Any]] = {
        "completions": {
            "url_matcher": lambda url: url == "/v1/chat/completions",
            "handler_getter": lambda: (
                openai_serving_chat.create_chat_completion
                if openai_serving_chat is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "embeddings": {
            "url_matcher": lambda url: url == "/v1/embeddings",
            "handler_getter": lambda: (
                openai_serving_embedding.create_embedding
                if openai_serving_embedding is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "score": {
            "url_matcher": lambda url: url.endswith("/score"),
            "handler_getter": lambda: (
                openai_serving_scores.create_score
                if openai_serving_scores is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "rerank": {
            "url_matcher": lambda url: url.endswith("/rerank"),
            "handler_getter": lambda: (
                openai_serving_scores.do_rerank
                if openai_serving_scores is not None
                else None
            ),
            "wrapper_fn": None,
        },
        "transcriptions": {
            "url_matcher": lambda url: url == "/v1/audio/transcriptions",
            "handler_getter": lambda: (
                openai_serving_transcription.create_transcription
                if openai_serving_transcription is not None
                else None
            ),
            "wrapper_fn": make_transcription_wrapper(is_translation=False),
        },
        "translations": {
            "url_matcher": lambda url: url == "/v1/audio/translations",
            "handler_getter": lambda: (
                openai_serving_translation.create_translation
                if openai_serving_translation is not None
                else None
            ),
            "wrapper_fn": make_transcription_wrapper(is_translation=True),
        },
    }

    return endpoint_registry

download_bytes_from_url async

download_bytes_from_url(url: str) -> bytes

Download data from a URL or decode from a data URL.

Parameters:

Name Type Description Default
url str

Either an HTTP/HTTPS URL or a data URL (data:...;base64,...)

required

Returns:

Type Description
bytes

Data as bytes

Source code in vllm/entrypoints/openai/run_batch.py
async def download_bytes_from_url(url: str) -> bytes:
    """
    Download data from a URL or decode from a data URL.

    Args:
        url: Either an HTTP/HTTPS URL or a data URL (data:...;base64,...)

    Returns:
        Data as bytes
    """
    parsed = urlparse(url)

    # Handle data URLs (base64 encoded)
    if parsed.scheme == "data":
        # Format: data:...;base64,<base64_data>
        if "," in url:
            header, data = url.split(",", 1)
            if "base64" in header:
                return base64.b64decode(data)
            else:
                raise ValueError(f"Unsupported data URL encoding: {header}")
        else:
            raise ValueError(f"Invalid data URL format: {url}")

    # Handle HTTP/HTTPS URLs
    elif parsed.scheme in ("http", "https"):
        async with (
            aiohttp.ClientSession() as session,
            session.get(url) as resp,
        ):
            if resp.status != 200:
                raise Exception(
                    f"Failed to download data from URL: {url}. Status: {resp.status}"
                )
            return await resp.read()

    else:
        raise ValueError(
            f"Unsupported URL scheme: {parsed.scheme}. "
            "Supported schemes: http, https, data"
        )

handle_endpoint_request

handle_endpoint_request(
    request: BatchRequestInput,
    tracker: BatchProgressTracker,
    url_matcher: Callable[[str], bool],
    handler_getter: Callable[[], Callable | None],
    wrapper_fn: WrapperFn | None = None,
) -> Awaitable[BatchRequestOutput] | None

Generic handler for endpoint requests.

Parameters:

Name Type Description Default
request BatchRequestInput

The batch request input

required
tracker BatchProgressTracker

Progress tracker for the batch

required
url_matcher Callable[[str], bool]

Function that takes a URL and returns True if it matches

required
handler_getter Callable[[], Callable | None]

Function that returns the handler function or None

required
wrapper_fn WrapperFn | None

Optional function to wrap the handler (e.g., for transcriptions)

None

Returns:

Type Description
Awaitable[BatchRequestOutput] | None

Awaitable[BatchRequestOutput] if the request was handled,

Awaitable[BatchRequestOutput] | None

None if URL didn't match

Source code in vllm/entrypoints/openai/run_batch.py
def handle_endpoint_request(
    request: BatchRequestInput,
    tracker: BatchProgressTracker,
    url_matcher: Callable[[str], bool],
    handler_getter: Callable[[], Callable | None],
    wrapper_fn: WrapperFn | None = None,
) -> Awaitable[BatchRequestOutput] | None:
    """
    Generic handler for endpoint requests.

    Args:
        request: The batch request input
        tracker: Progress tracker for the batch
        url_matcher: Function that takes a URL and returns True if it matches
        handler_getter: Function that returns the handler function or None
        wrapper_fn: Optional function to wrap the handler (e.g., for transcriptions)

    Returns:
        Awaitable[BatchRequestOutput] if the request was handled,
        None if URL didn't match
    """
    if not url_matcher(request.url):
        return None

    handler_fn = handler_getter()
    if handler_fn is None:
        error_msg = f"Model does not support endpoint: {request.url}"
        return make_async_error_request_output(request, error_msg=error_msg)

    # Apply wrapper if provided (e.g., for transcriptions/translations)
    if wrapper_fn is not None:
        handler_fn = wrapper_fn(handler_fn)

    tracker.submitted()
    return run_request(handler_fn, request, tracker)

make_transcription_wrapper

make_transcription_wrapper(
    is_translation: bool,
) -> WrapperFn

Factory function to create a wrapper for transcription/translation handlers. The wrapper converts BatchTranscriptionRequest or BatchTranslationRequest to TranscriptionRequest or TranslationRequest and calls the appropriate handler.

Parameters:

Name Type Description Default
is_translation bool

If True, process as translation; otherwise process as transcription

required

Returns:

Type Description
WrapperFn

A function that takes a handler and returns a wrapped handler

Source code in vllm/entrypoints/openai/run_batch.py
def make_transcription_wrapper(is_translation: bool) -> WrapperFn:
    """
    Factory function to create a wrapper for transcription/translation handlers.
    The wrapper converts BatchTranscriptionRequest or BatchTranslationRequest
    to TranscriptionRequest or TranslationRequest and calls the appropriate handler.

    Args:
        is_translation: If True, process as translation; otherwise process
            as transcription

    Returns:
        A function that takes a handler and returns a wrapped handler
    """

    def wrapper(handler_fn: Callable):
        async def transcription_wrapper(
            batch_request_body: (BatchTranscriptionRequest | BatchTranslationRequest),
        ) -> (
            TranscriptionResponse
            | TranscriptionResponseVerbose
            | TranslationResponse
            | TranslationResponseVerbose
            | ErrorResponse
        ):
            try:
                # Download data from URL
                audio_data = await download_bytes_from_url(batch_request_body.file_url)

                # Create a mock file from the downloaded audio data
                mock_file = UploadFile(
                    file=BytesIO(audio_data),
                    filename="audio.bin",
                )

                # Convert batch request to regular request
                # by copying all fields except file_url and setting file to mock_file
                request_dict = batch_request_body.model_dump(exclude={"file_url"})
                request_dict["file"] = mock_file

                if is_translation:
                    # Create TranslationRequest from BatchTranslationRequest
                    translation_request = TranslationRequest.model_validate(
                        request_dict
                    )
                    return await handler_fn(audio_data, translation_request)
                else:
                    # Create TranscriptionRequest from BatchTranscriptionRequest
                    transcription_request = TranscriptionRequest.model_validate(
                        request_dict
                    )
                    return await handler_fn(audio_data, transcription_request)
            except Exception as e:
                operation = "translation" if is_translation else "transcription"
                return ErrorResponse(
                    error=ErrorInfo(
                        message=f"Failed to process {operation}: {str(e)}",
                        type="BadRequestError",
                        code=HTTPStatus.BAD_REQUEST.value,
                    )
                )

        return transcription_wrapper

    return wrapper

upload_data async

upload_data(
    output_url: str, data_or_file: str, from_file: bool
) -> None

Upload a local file to a URL. output_url: The URL to upload the file to. data_or_file: Either the data to upload or the path to the file to upload. from_file: If True, data_or_file is the path to the file to upload.

Source code in vllm/entrypoints/openai/run_batch.py
async def upload_data(output_url: str, data_or_file: str, from_file: bool) -> None:
    """
    Upload a local file to a URL.
    output_url: The URL to upload the file to.
    data_or_file: Either the data to upload or the path to the file to upload.
    from_file: If True, data_or_file is the path to the file to upload.
    """
    # Timeout is a common issue when uploading large files.
    # We retry max_retries times before giving up.
    max_retries = 5
    # Number of seconds to wait before retrying.
    delay = 5

    for attempt in range(1, max_retries + 1):
        try:
            # We increase the timeout to 1000 seconds to allow
            # for large files (default is 300).
            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=1000)
            ) as session:
                if from_file:
                    with open(data_or_file, "rb") as file:
                        async with session.put(output_url, data=file) as response:
                            if response.status != 200:
                                raise Exception(
                                    f"Failed to upload file.\n"
                                    f"Status: {response.status}\n"
                                    f"Response: {response.text()}"
                                )
                else:
                    async with session.put(output_url, data=data_or_file) as response:
                        if response.status != 200:
                            raise Exception(
                                f"Failed to upload data.\n"
                                f"Status: {response.status}\n"
                                f"Response: {response.text()}"
                            )

        except Exception as e:
            if attempt < max_retries:
                logger.error(
                    "Failed to upload data (attempt %d). Error message: %s.\nRetrying in %d seconds...",  # noqa: E501
                    attempt,
                    e,
                    delay,
                )
                await asyncio.sleep(delay)
            else:
                raise Exception(
                    f"Failed to upload data (attempt {attempt}). Error message: {str(e)}."  # noqa: E501
                ) from e

write_file async

write_file(
    path_or_url: str,
    batch_outputs: list[BatchRequestOutput],
    output_tmp_dir: str,
) -> None

Write batch_outputs to a file or upload to a URL. path_or_url: The path or URL to write batch_outputs to. batch_outputs: The list of batch outputs to write. output_tmp_dir: The directory to store the output file before uploading it to the output URL.

Source code in vllm/entrypoints/openai/run_batch.py
async def write_file(
    path_or_url: str, batch_outputs: list[BatchRequestOutput], output_tmp_dir: str
) -> None:
    """
    Write batch_outputs to a file or upload to a URL.
    path_or_url: The path or URL to write batch_outputs to.
    batch_outputs: The list of batch outputs to write.
    output_tmp_dir: The directory to store the output file before uploading it
    to the output URL.
    """
    if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
        if output_tmp_dir is None:
            logger.info("Writing outputs to memory buffer")
            output_buffer = StringIO()
            for o in batch_outputs:
                print(o.model_dump_json(), file=output_buffer)
            output_buffer.seek(0)
            logger.info("Uploading outputs to %s", path_or_url)
            await upload_data(
                path_or_url,
                output_buffer.read().strip().encode("utf-8"),
                from_file=False,
            )
        else:
            # Write responses to a temporary file and then upload it to the URL.
            with tempfile.NamedTemporaryFile(
                mode="w",
                encoding="utf-8",
                dir=output_tmp_dir,
                prefix="tmp_batch_output_",
                suffix=".jsonl",
            ) as f:
                logger.info("Writing outputs to temporary local file %s", f.name)
                await write_local_file(f.name, batch_outputs)
                logger.info("Uploading outputs to %s", path_or_url)
                await upload_data(path_or_url, f.name, from_file=True)
    else:
        logger.info("Writing outputs to local file %s", path_or_url)
        await write_local_file(path_or_url, batch_outputs)

write_local_file async

write_local_file(
    output_path: str,
    batch_outputs: list[BatchRequestOutput],
) -> None

Write the responses to a local file. output_path: The path to write the responses to. batch_outputs: The list of batch outputs to write.

Source code in vllm/entrypoints/openai/run_batch.py
async def write_local_file(
    output_path: str, batch_outputs: list[BatchRequestOutput]
) -> None:
    """
    Write the responses to a local file.
    output_path: The path to write the responses to.
    batch_outputs: The list of batch outputs to write.
    """
    # We should make this async, but as long as run_batch runs as a
    # standalone program, blocking the event loop won't affect performance.
    with open(output_path, "w", encoding="utf-8") as f:
        for o in batch_outputs:
            print(o.model_dump_json(), file=f)