Queue Status
This page documents advanced/queue_status.py in mint-quickstart.
What this demo does
- Creates a sampling session using the low-level
AsyncTinkerclient. - Submits a sample request with the
X-Tinker-Sampling-Backpressure: 1header. - Polls
futures.with_raw_response.retrieve()in a loop. - On HTTP 408 (request pending), reads queue status from both the response body and headers:
- Body fields:
queue_state,queue_position,queue_depth,estimated_wait_s,progress - Headers:
Retry-After,X-Queue-Depth,X-Queue-Position,X-Queue-ETA-S,X-Queue-Status
- Body fields:
- On HTTP 200, prints the result keys and exits.
When to use
- When you need to monitor queue position for long-running sampling requests.
- When building UIs that display “position in queue” or “estimated wait time”.
- When debugging backpressure behavior in production deployments.
Expected output
sampling_session_created sampling_session_id=abc123
request_submitted request_id=req_456
queue_fields request_id='req_456' type='pending' status='queued' queue_state='waiting' ...
queue_fields request_id='req_456' type='pending' status='queued' queue_state='processing' ...
result_received request_id=req_456 keys=['sequences', 'request_id', ...]Prerequisites
- Python >= 3.11
MINT_API_KEYorTINKER_API_KEYset (optional if server auth is disabled)
How to run
export MINT_API_KEY=sk-...
python advanced/queue_status.pyParameters (env vars)
MINT_API_KEYorTINKER_API_KEY: API key for authenticationMINT_BASE_URLorTINKER_BASE_URL: server endpoint (default:https://mint.macaron.xin/)MINT_BASE_MODEL: defaultQwen/Qwen3-0.6B
Full script
#!/usr/bin/env python3
"""Queue status polling — read pending queue fields on 408 via raw response.
Demonstrates the backpressure API: submit a sample request, then poll
with `futures.with_raw_response.retrieve()` to read queue depth,
position, ETA, and status from both the response body and headers.
Env:
MINT_API_KEY or TINKER_API_KEY (optional if server auth disabled)
MINT_BASE_URL or TINKER_BASE_URL (optional, default mint)
MINT_BASE_MODEL (optional, default Qwen/Qwen3-0.6B)
Run:
python advanced/queue_status.py
"""
from __future__ import annotations
import asyncio
import inspect
import os
import sys
import time
from pathlib import Path
def load_env_file(path: Path) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("export "):
stripped = stripped[len("export "):].lstrip()
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
REPO_ROOT = Path(__file__).resolve().parents[1]
load_env_file(REPO_ROOT / ".env")
for base_dir in (REPO_ROOT.parent, REPO_ROOT):
for src_dir in ("mindlab-toolkit-alpha/src", "mindlab-toolkit/src"):
mint_src = base_dir / src_dir
if mint_src.exists() and str(mint_src) not in sys.path:
sys.path.insert(0, str(mint_src))
break
else:
continue
break
import mint as _mint # triggers env sync (MINT_* -> TINKER_*)
import tinker
from tinker._client import AsyncTinker
from tinker import types
def _int_header(headers, name: str) -> int:
value = headers.get(name)
if value is None:
raise RuntimeError(f"missing header {name}")
try:
return int(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not int: {value!r}") from exc
def _maybe_int_header(headers, name: str) -> int | None:
value = headers.get(name)
if value is None:
return None
try:
return int(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not int: {value!r}") from exc
def _maybe_float_header(headers, name: str) -> float | None:
value = headers.get(name)
if value is None:
return None
try:
return float(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not float: {value!r}") from exc
async def main() -> None:
base_model = os.environ.get("MINT_BASE_MODEL", "Qwen/Qwen3-0.6B")
session_id = f"queue_status_{int(time.time())}"
async with AsyncTinker() as client:
# 1. Create sampling session
create_req = types.CreateSamplingSessionRequest(
session_id=session_id,
sampling_session_seq_id=0,
base_model=base_model,
)
create_resp = await client.service.create_sampling_session(request=create_req)
sampling_session_id = create_resp.sampling_session_id
print(
"sampling_session_created",
f"sampling_session_id={sampling_session_id}",
flush=True,
)
# 2. Submit sample request with backpressure header
sample_req = types.SampleRequest(
sampling_session_id=sampling_session_id,
seq_id=1,
num_samples=1,
prompt=types.ModelInput.from_ints([1] * 256),
sampling_params=types.SamplingParams(
max_tokens=128,
temperature=0.7,
top_k=-1,
top_p=1.0,
),
)
future = await client.sampling.asample(
request=sample_req,
extra_headers={"X-Tinker-Sampling-Backpressure": "1"},
)
request_id = future.request_id
print("request_submitted", f"request_id={request_id}", flush=True)
# 3. Poll until result or unexpected error
while True:
try:
raw = await client.futures.with_raw_response.retrieve(
request=types.FutureRetrieveRequest(
request_id=request_id,
)
)
status_code = raw.status_code
body = raw.json()
if inspect.isawaitable(body):
body = await body
headers = raw.headers
except tinker.APIStatusError as e:
status_code = e.status_code
headers = e.response.headers if e.response is not None else {}
if isinstance(e.body, dict):
body = e.body
else:
try:
body = e.response.json()
except Exception:
body = {"error": str(e)}
if status_code == 200:
if isinstance(body, dict):
print(
"result_received",
f"request_id={request_id}",
f"keys={list(body)[:8]}",
flush=True,
)
else:
print("result_received", f"request_id={request_id}", flush=True)
return
if status_code != 408:
raise RuntimeError(
f"unexpected status {status_code} body={str(body)[:500]!r}"
)
# 5. Parse queue status from 408 response
retry_after_s = _int_header(headers, "Retry-After")
queue_depth = _maybe_int_header(headers, "X-Queue-Depth")
queue_position = _maybe_int_header(headers, "X-Queue-Position")
queue_eta_s = _maybe_float_header(headers, "X-Queue-ETA-S")
queue_status = headers.get("X-Queue-Status")
print(
"queue_fields",
f"request_id={body.get('request_id')!r}",
f"type={body.get('type')!r}",
f"status={body.get('status')!r}",
f"queue_state={body.get('queue_state')!r}",
f"queue_state_reason={body.get('queue_state_reason')!r}",
f"queue_depth={body.get('queue_depth')!r}",
f"queue_position={body.get('queue_position')!r}",
f"estimated_wait_s={body.get('estimated_wait_s')!r}",
f"progress={body.get('progress')!r}",
f"retry_after_s={body.get('retry_after_s')!r}",
f"x_queue_depth={queue_depth!r}",
f"x_queue_position={queue_position!r}",
f"x_queue_status={queue_status!r}",
f"x_queue_eta_s={queue_eta_s!r}",
f"retry_after={retry_after_s!r}",
flush=True,
)
body_request_id = body.get("request_id")
if (
isinstance(body_request_id, str)
and body_request_id
and body_request_id != request_id
):
raise RuntimeError(
f"request_id mismatch: body={body_request_id!r} "
f"expected {request_id!r}"
)
await asyncio.sleep(max(1, int(retry_after_s)))
if __name__ == "__main__":
asyncio.run(main())Key concepts
Backpressure header
Setting X-Tinker-Sampling-Backpressure: 1 tells the server to return 408 with queue info instead of blocking the connection when the request is still pending.
Queue fields (408 body)
| Field | Type | Description |
|---|---|---|
queue_state | string | Current state: waiting, processing, etc. |
queue_position | int | Position in the queue (0-indexed) |
queue_depth | int | Total requests in the queue |
estimated_wait_s | float | Estimated wait time in seconds |
retry_after_s | float | Suggested poll interval |
Queue headers
| Header | Type | Description |
|---|---|---|
Retry-After | int | Seconds before next poll |
X-Queue-Depth | int | Total queue depth |
X-Queue-Position | int | Position in queue |
X-Queue-ETA-S | float | Estimated wait (seconds) |
X-Queue-Status | string | Queue status string |