Streaming with SSE.
How to consume the live event stream from /api/v1/query/stream in your own client — browser, Node, Python, or curl — with parsing, reconnection, and the handful of pitfalls that bite everyone the first time.
§ 01What is SSE
Server-Sent Events is a thin protocol on top of HTTP for one-way server-to-client streaming. The server keeps a single response open and writes UTF-8 lines in a tiny structured format. The client reads them as they arrive.
We use SSE for streaming because it's the right tool for this shape of work:
- One-way is enough. The client doesn't need to send anything after the initial query — only receive events.
- Browsers handle reconnection. The native
EventSourcereconnects automatically on transient failures. - It's simpler than WebSocket. No upgrade handshake, no framing protocol — it's just a long-lived HTTP response.
- Proxies understand it. Most reverse proxies and CDNs handle
text/event-streamcorrectly without configuration changes.
§ 02Event types
The stream emits a small, fixed set of event types. Every event is a JSON object on a data: line, preceded by an event: line that names the type.
| event | payload | fires |
|---|---|---|
| start | request_id, session_id |
Once, immediately on connection. Capture the IDs. |
| message | text |
Repeatedly. Status text describing what the engine is doing now. |
| sources | stage (discovered, screened, analyzed), count, items |
At each source-pipeline stage. Use stage to update the right UI element. |
| reasoning | text |
Repeatedly during synthesis. Partial reasoning chunks; can be appended. |
| final | answer, citations, usage |
Once. The full final answer. Stream closes shortly after. |
| error | code, message |
On failure. Stream closes after. |
§ 03Browser
The native EventSource API is the obvious choice — but it has a load-bearing limitation: it only does GET. Our endpoint takes a JSON body and a bearer header, so EventSource isn't usable directly.
The recommended browser approach is fetch streaming: a regular fetch() POST whose response body you read incrementally. You parse the SSE format yourself. It's about 30 lines.
async function streamQuery(query, onEvent) {
// NOTE: in production, point this at your own backend, not api.essarion.com
// (don't ship the API key to the browser — see /guides/integration/)
const res = await fetch("/api/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query }),
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE messages are separated by \n\n
const messages = buffer.split("\n\n");
buffer = messages.pop(); // last is potentially partial
for (const msg of messages) {
let event = "message";
let data = "";
for (const line of msg.split("\n")) {
if (line.startsWith("event:")) event = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (data) {
try { onEvent(event, JSON.parse(data)); }
catch (e) { console.warn("bad SSE payload", data); }
}
}
}
}
streamQuery("What changed in the IRA in 2024?", (event, payload) => {
if (event === "message") console.log("status:", payload.text);
if (event === "sources") console.log("sources:", payload.stage, payload.count);
if (event === "reasoning") appendReasoning(payload.text);
if (event === "final") renderAnswer(payload.answer, payload.citations);
});
{ stream: true } flag on TextDecoder.decode(). Without it, multi-byte UTF-8 characters at chunk boundaries get mangled.§ 04Node
On Node 18+, fetch is built in and works the same way as the browser version. The difference is that you have process.env, so you can attach the bearer token directly.
async function streamQuery(query) {
const res = await fetch("https://api.essarion.com/api/v1/query/stream", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.ESSARION_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ query }),
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const messages = buffer.split("\n\n");
buffer = messages.pop();
for (const msg of messages) {
let event = "message", data = "";
for (const line of msg.split("\n")) {
if (line.startsWith("event:")) event = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (data) yield { event, payload: JSON.parse(data) };
}
}
}
Wrap that as an async generator and consume it with for await:
for await (const { event, payload } of streamQuery("...")) {
if (event === "final") {
console.log(payload.answer);
break;
}
}
§ 05Python
The cleanest Python pattern is httpx.AsyncClient.stream() with aiter_lines(), which already splits on newlines. You only have to assemble events from runs of lines.
import os, json, httpx
async def stream_query(query):
headers = {
"Authorization": f"Bearer {os.environ['ESSARION_KEY']}",
"Content-Type": "application/json",
}
body = {"query": query}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
"https://api.essarion.com/api/v1/query/stream",
headers=headers,
json=body,
) as resp:
resp.raise_for_status()
event = "message"
data_parts = []
async for line in resp.aiter_lines():
if line == "":
if data_parts:
yield event, json.loads("".join(data_parts))
event, data_parts = "message", []
elif line.startswith("event:"):
event = line[6:].strip()
elif line.startswith("data:"):
data_parts.append(line[5:].strip())
# usage
import asyncio
async def main():
async for event, payload in stream_query("..."):
if event == "final":
print(payload["answer"])
break
asyncio.run(main())
timeout=None on the AsyncClient for streaming. The default httpx timeout is 5 seconds and will cut you off mid-stream.§ 06curl
For quick verification from a shell, curl -N disables output buffering so you can see events as they land:
curl -N \
-H "Authorization: Bearer $ESSARION_KEY" \
-H "Content-Type: application/json" \
-d '{"query":"What changed in the IRA in 2024?"}' \
https://api.essarion.com/api/v1/query/stream
If you want to watch a particular event type, pipe through grep:
curl -N ... | grep --line-buffered -E '^(event:|data:)'
§ 07Reconnection & idempotency
SSE connections drop. Mobile networks switch towers, laptops sleep, load balancers recycle. You should plan for this even on a good day.
The strategy that works:
- On
start, persist therequest_idserver-side (or inlocalStoragefor browser-only flows). - On disconnect, attempt at most one reconnect by re-issuing the same query.
- If you reconnect, use the persisted
request_idto dedupe — if the original run already produced afinalevent, fetch the result from/api/v1/runs/{request_id}instead of starting a new one.
The request_id idempotency contract: re-submitting the same query string while the original run is still in progress will not create a duplicate run. The server returns the existing request_id and resumes streaming from where you reconnected.
§ 08Common pitfalls
Proxy buffering
If you're relaying SSE through your own server (Pattern C in Integration patterns) and events arrive in big bursts instead of as they happen, your proxy is buffering. The fix depends on the proxy:
- Nginx: set
proxy_buffering offandproxy_cache offon the streaming location. - Cloudflare: SSE works by default, but Workers in front of it can buffer. Use
ReadableStream.tee()rather thanawait response.text(). - Express / Node: call
res.flushHeaders()immediately, andres.write()chunks rather thanres.send().
Gateway timeouts
If you're hitting this, your options are: move the relay to a long-running host (Fly, Render, Railway, EC2, your own k8s), or switch to Pattern B (async) for queries longer than your timeout.
Partial chunk parsing
SSE messages are delimited by \n\n, but a single read() can land in the middle of a message. The pattern in the code samples above — buffer, split on \n\n, pop the last (potentially partial) element back into the buffer — is the load-bearing detail. If you forget it, you'll occasionally see JSON.parse errors on truncated payloads at random.
§ 09Where to go next
- Wire streaming into a real product → Pattern C — streaming relay
- Endpoint reference → API · Streaming
- Pull a completed timeline → API · Run timelines