Streaming
Stream chat completions in real-time using Server-Sent Events (SSE).
The Chat Completions API supports real-time streaming using Server-Sent Events (SSE). When streaming is enabled, the response is delivered as a series of events, allowing your application to display content as it's generated.
Enabling Streaming
Set stream: true in your request body:
curl https://api.persly.ai/v1/chat/completions \
-H "Authorization: Bearer $PERSLY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "persly-chat-v1",
"messages": [{"role": "user", "text": "Explain hypertension treatment options."}],
"stream": true
}'SSE Protocol
The streaming response uses the text/event-stream content type. Each event is prefixed with data: and followed by two newlines. Every event is a JSON object with a type field that determines the payload structure.
Event Types
| type | Delivery | Description |
|---|---|---|
steps | Snapshot (replace) | Current state of all processing steps |
message | Delta (append) | Response text chunk |
sources | Snapshot (replace) | Source citations |
follow_up_questions | Snapshot (replace) | Follow-up suggestions |
error | Terminal | Streaming error |
Snapshot events contain the full current value — always replace the previous value. Delta events contain a fragment — append to the accumulated value.
Steps Events
Each steps event contains the full current steps[] array. As processing progresses, new steps and actions are added to the array. Always replace the previous steps value with the latest event.
data: {"type":"steps","steps":[{"description":"Searching medical knowledge base","actions":[]}]}data: {"type":"steps","steps":[{"description":"Searching medical knowledge base","actions":[{"type":"search_official_source","input":{"query":"hypertension treatment"},"result":[{"title":"JNC 8 Guidelines","url":"https://...","content":"..."}]}]}]}data: {"type":"steps","steps":[{"description":"Searching medical knowledge base","actions":[{"type":"search_official_source","input":{"query":"hypertension treatment"},"result":[{"title":"JNC 8 Guidelines","url":"https://...","content":"..."}]}]},{"description":"Generating response","actions":[]}]}Message Events
Message events contain partial text fragments of the AI response. Concatenate all content values to build the full message:
data: {"type":"message","content":"Hypertension"}data: {"type":"message","content":" treatment typically begins with"}Sources Event
Emitted after message generation completes. Contains the full list of source citations:
data: {"type":"sources","sources":[{"title":"Hypertension Guidelines - JNC 8","url":"https://...","relevance_score":0.92}]}Returns [] when no sources are found.
Follow-up Questions Event
Emitted when include_follow_ups: true:
data: {"type":"follow_up_questions","follow_up_questions":["What are the causes of hypertension?","How is hypertension diagnosed?"]}Returns [] when included but none are generated.
Error Event
If streaming fails during generation, an error event is emitted:
data: {"type":"error","error":{"type":"server_error","code":"internal_error","message":"AI processing failed"}}In this error path, the stream terminates with [DONE] without emitting further events. When streaming is already underway, the HTTP status is 200; errors are reported in-band through SSE.
Termination Signal
data: [DONE]Event Order
Typical success-path order:
steps(emitted multiple times as processing progresses)messagechunks (multiple)sources(always emitted,[]when none found)follow_up_questions(only wheninclude_follow_ups: true,[]when none generated)[DONE]
Reconstructing the Response Object
Streaming delivers the same data as a non-streaming response, split across SSE events. Collect the events to build the identical response object:
| SSE Event | Delivery | Maps to |
|---|---|---|
steps | Replace | steps |
message | Append content | message |
sources | Replace | sources |
follow_up_questions | Replace | follow_up_questions |
For complete error envelope details (error vs 422 detail[]), see Error Handling.
import httpx
import json
result = {"steps": [], "message": "", "sources": None, "follow_up_questions": None}
with httpx.stream(
"POST",
"https://api.persly.ai/v1/chat/completions",
headers={"Authorization": "Bearer YOUR_API_KEY"},
json={
"model": "persly-chat-v1",
"messages": [{"role": "user", "text": "Explain hypertension treatment."}],
"stream": True,
},
) as response:
if response.status_code != 200:
response.read()
payload = response.json()
if response.status_code == 422:
first = (payload.get("detail") or [{}])[0]
raise RuntimeError(
f"Validation error [{first.get('type', 'value_error')}]: "
f"{first.get('msg', 'Invalid request parameters')}"
)
error = payload.get("error", {})
raise RuntimeError(
f"API error [{error.get('code', 'unknown')}]: "
f"{error.get('message', 'Request failed')}"
)
for line in response.iter_lines():
if not line or not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
chunk = json.loads(data)
if chunk["type"] == "error":
raise RuntimeError(
f"Streaming error [{chunk['error']['code']}]: "
f"{chunk['error']['message']}"
)
elif chunk["type"] == "steps":
result["steps"] = chunk["steps"] # snapshot: replace
elif chunk["type"] == "message":
result["message"] += chunk["content"] # delta: append
elif chunk["type"] == "sources":
result["sources"] = chunk["sources"] or None
elif chunk["type"] == "follow_up_questions":
result["follow_up_questions"] = chunk["follow_up_questions"] or None
# `result` is now identical to a non-streaming response
print(result["message"])
for source in (result["sources"] or []):
print(f" Source: {source['title']}: {source['url']}")const result = { steps: [], message: "", sources: null, follow_up_questions: null };
const response = await fetch("https://api.persly.ai/v1/chat/completions", {
method: "POST",
headers: {
Authorization: "Bearer YOUR_API_KEY",
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "persly-chat-v1",
messages: [{ role: "user", text: "Explain hypertension treatment." }],
stream: true,
}),
});
if (!response.ok) {
const payload = await response.json();
if (response.status === 422) {
const first = payload.detail?.[0];
throw new Error(
`Validation error [${first?.type ?? "value_error"}]: ${first?.msg ?? "Invalid request parameters"}`
);
} else {
const error = payload?.error ?? {};
throw new Error(
`API error [${error.code ?? "unknown"}]: ${error.message ?? "Request failed"}`
);
}
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let streamDone = false;
while (!streamDone) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") {
streamDone = true;
break;
}
const chunk = JSON.parse(data);
switch (chunk.type) {
case "error":
throw new Error(
`Streaming error [${chunk.error.code}]: ${chunk.error.message}`
);
case "steps":
result.steps = chunk.steps; // snapshot: replace
break;
case "message":
result.message += chunk.content; // delta: append
break;
case "sources":
result.sources = chunk.sources.length > 0 ? chunk.sources : null;
break;
case "follow_up_questions":
result.follow_up_questions =
chunk.follow_up_questions.length > 0 ? chunk.follow_up_questions : null;
break;
}
}
}
// `result` is now identical to a non-streaming response
console.log(result.message);
for (const source of result.sources ?? []) {
console.log(` Source: ${source.title}: ${source.url}`);
}Key Considerations
- Content-Type: The response header is
text/event-stream - Snapshot vs Delta:
steps,sources,follow_up_questionsare snapshot events (replace previous value);messageis the only delta event (appendcontent) - Error path: Runtime stream failures are sent as
type: "error"events; clients should handle this before processing other event types - Forward compatibility: Clients should ignore unknown
typevalues and unknown fields - Sources: Always emitted as a
sourcesevent;[]when none are found - Follow-up questions: Only emitted when
include_follow_ups: true;[]when included but none generated - Billing: Streaming and non-streaming use the same pricing rules; follow-up surcharge applies in both modes only when follow-up generation is actually invoked