DocsGuidesStreaming with SSE

Streaming with SSE.

How to consume the live event stream from /api/v1/query/stream in your own client — browser, Node, Python, or curl — with parsing, reconnection, and the handful of pitfalls that bite everyone the first time.

§ 01What is SSE

Server-Sent Events is a thin protocol on top of HTTP for one-way server-to-client streaming. The server keeps a single response open and writes UTF-8 lines in a tiny structured format. The client reads them as they arrive.

We use SSE for streaming because it's the right tool for this shape of work:

NoteIf you need bidirectional streaming — say, the user can interrupt or amend the running query — you want WebSocket or HTTP/2 streams instead. Essarion's interactive runtime (used inside Agents) uses WebSocket. The research stream does not need it.

§ 02Event types

The stream emits a small, fixed set of event types. Every event is a JSON object on a data: line, preceded by an event: line that names the type.

eventpayloadfires
start request_id, session_id Once, immediately on connection. Capture the IDs.
message text Repeatedly. Status text describing what the engine is doing now.
sources stage (discovered, screened, analyzed), count, items At each source-pipeline stage. Use stage to update the right UI element.
reasoning text Repeatedly during synthesis. Partial reasoning chunks; can be appended.
final answer, citations, usage Once. The full final answer. Stream closes shortly after.
error code, message On failure. Stream closes after.

§ 03Browser

The native EventSource API is the obvious choice — but it has a load-bearing limitation: it only does GET. Our endpoint takes a JSON body and a bearer header, so EventSource isn't usable directly.

The recommended browser approach is fetch streaming: a regular fetch() POST whose response body you read incrementally. You parse the SSE format yourself. It's about 30 lines.

browser
async function streamQuery(query, onEvent) {
  // NOTE: in production, point this at your own backend, not api.essarion.com
  // (don't ship the API key to the browser — see /guides/integration/)
  const res = await fetch("/api/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ query }),
  });

  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += decoder.decode(value, { stream: true });

    // SSE messages are separated by \n\n
    const messages = buffer.split("\n\n");
    buffer = messages.pop(); // last is potentially partial

    for (const msg of messages) {
      let event = "message";
      let data = "";
      for (const line of msg.split("\n")) {
        if (line.startsWith("event:")) event = line.slice(6).trim();
        else if (line.startsWith("data:")) data += line.slice(5).trim();
      }
      if (data) {
        try { onEvent(event, JSON.parse(data)); }
        catch (e) { console.warn("bad SSE payload", data); }
      }
    }
  }
}

streamQuery("What changed in the IRA in 2024?", (event, payload) => {
  if (event === "message")   console.log("status:", payload.text);
  if (event === "sources")   console.log("sources:", payload.stage, payload.count);
  if (event === "reasoning") appendReasoning(payload.text);
  if (event === "final")     renderAnswer(payload.answer, payload.citations);
});
CautionDon't forget the { stream: true } flag on TextDecoder.decode(). Without it, multi-byte UTF-8 characters at chunk boundaries get mangled.

§ 04Node

On Node 18+, fetch is built in and works the same way as the browser version. The difference is that you have process.env, so you can attach the bearer token directly.

node
async function streamQuery(query) {
  const res = await fetch("https://api.essarion.com/api/v1/query/stream", {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${process.env.ESSARION_KEY}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({ query }),
  });

  if (!res.ok) throw new Error(`HTTP ${res.status}`);

  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += decoder.decode(value, { stream: true });

    const messages = buffer.split("\n\n");
    buffer = messages.pop();

    for (const msg of messages) {
      let event = "message", data = "";
      for (const line of msg.split("\n")) {
        if (line.startsWith("event:")) event = line.slice(6).trim();
        else if (line.startsWith("data:")) data += line.slice(5).trim();
      }
      if (data) yield { event, payload: JSON.parse(data) };
    }
  }
}

Wrap that as an async generator and consume it with for await:

node
for await (const { event, payload } of streamQuery("...")) {
  if (event === "final") {
    console.log(payload.answer);
    break;
  }
}

§ 05Python

The cleanest Python pattern is httpx.AsyncClient.stream() with aiter_lines(), which already splits on newlines. You only have to assemble events from runs of lines.

python
import os, json, httpx

async def stream_query(query):
    headers = {
        "Authorization": f"Bearer {os.environ['ESSARION_KEY']}",
        "Content-Type": "application/json",
    }
    body = {"query": query}

    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            "POST",
            "https://api.essarion.com/api/v1/query/stream",
            headers=headers,
            json=body,
        ) as resp:
            resp.raise_for_status()

            event = "message"
            data_parts = []
            async for line in resp.aiter_lines():
                if line == "":
                    if data_parts:
                        yield event, json.loads("".join(data_parts))
                    event, data_parts = "message", []
                elif line.startswith("event:"):
                    event = line[6:].strip()
                elif line.startswith("data:"):
                    data_parts.append(line[5:].strip())

# usage
import asyncio
async def main():
    async for event, payload in stream_query("..."):
        if event == "final":
            print(payload["answer"])
            break

asyncio.run(main())
TipSet timeout=None on the AsyncClient for streaming. The default httpx timeout is 5 seconds and will cut you off mid-stream.

§ 06curl

For quick verification from a shell, curl -N disables output buffering so you can see events as they land:

curl
curl -N \
  -H "Authorization: Bearer $ESSARION_KEY" \
  -H "Content-Type: application/json" \
  -d '{"query":"What changed in the IRA in 2024?"}' \
  https://api.essarion.com/api/v1/query/stream

If you want to watch a particular event type, pipe through grep:

curl
curl -N ... | grep --line-buffered -E '^(event:|data:)'

§ 07Reconnection & idempotency

SSE connections drop. Mobile networks switch towers, laptops sleep, load balancers recycle. You should plan for this even on a good day.

The strategy that works:

  1. On start, persist the request_id server-side (or in localStorage for browser-only flows).
  2. On disconnect, attempt at most one reconnect by re-issuing the same query.
  3. If you reconnect, use the persisted request_id to dedupe — if the original run already produced a final event, fetch the result from /api/v1/runs/{request_id} instead of starting a new one.
CautionDon't auto-retry indefinitely on connection loss. Each retry is a new query that costs tokens. One retry, then surface the failure to the user.

The request_id idempotency contract: re-submitting the same query string while the original run is still in progress will not create a duplicate run. The server returns the existing request_id and resumes streaming from where you reconnected.

§ 08Common pitfalls

Proxy buffering

If you're relaying SSE through your own server (Pattern C in Integration patterns) and events arrive in big bursts instead of as they happen, your proxy is buffering. The fix depends on the proxy:

Gateway timeouts

CautionSeveral PaaS hosts cap request duration at 30 or 60 seconds. Vercel, Cloudflare Workers (free), and AWS API Gateway all have hard limits that will cut your stream short. If your host has a 60-second cap, you'll get a 504 mid-run on long queries.

If you're hitting this, your options are: move the relay to a long-running host (Fly, Render, Railway, EC2, your own k8s), or switch to Pattern B (async) for queries longer than your timeout.

Partial chunk parsing

SSE messages are delimited by \n\n, but a single read() can land in the middle of a message. The pattern in the code samples above — buffer, split on \n\n, pop the last (potentially partial) element back into the buffer — is the load-bearing detail. If you forget it, you'll occasionally see JSON.parse errors on truncated payloads at random.

§ 09Where to go next