Streaming
Overview
Section titled “Overview”Streaming lets you receive LLM output as it is generated, chunk by chunk, instead of waiting for the entire response to complete. This is essential for chat interfaces where perceived latency matters — the user sees tokens appear in real time.
Prompty wraps the raw SDK stream in a PromptyStream (or
AsyncPromptyStream) that accumulates chunks for tracing, then hands them to
the Processor which extracts usable content deltas and yields them to your
application.
flowchart LR
A["SDK Stream"] --> B["PromptyStream\naccumulates chunks\ntraces on exhaust"]
B --> C["Processor\nextracts delta.content\nyields text chunks"]
C --> D["Application"]
style A fill:#e5e7eb,stroke:#6b7280,color:#374151
style B fill:#1d4ed8,stroke:#3b82f6,color:#fff
style C fill:#10b981,stroke:#059669,color:#fff
style D fill:#f59e0b,stroke:#d97706,color:#fff
Enabling Streaming
Section titled “Enabling Streaming”Set stream: true in the model’s additionalProperties inside the options
block. You can do this in the .prompty file or override it at runtime.
In the .prompty file
Section titled “In the .prompty file”---name: streaming-chatmodel: id: gpt-4o-mini provider: openai apiType: chat connection: kind: key endpoint: ${env:OPENAI_API_BASE:https://api.openai.com/v1} apiKey: ${env:OPENAI_API_KEY} options: temperature: 0.7 additionalProperties: stream: true---system:You are a helpful assistant.
user:{{question}}At runtime
Section titled “At runtime”from prompty import load
agent = load("chat.prompty")
# Enable streaming by mutating options before executionagent.model.options.additionalProperties["stream"] = Trueimport { load } from "@prompty/core";
const agent = load("chat.prompty");
// Enable streaming by mutating options before executionagent.model.options!.additionalProperties!.stream = true;Consuming Streams
Section titled “Consuming Streams”from prompty import load, run, process
agent = load("chat.prompty")agent.model.options.additionalProperties["stream"] = True
# run with raw=True returns the PromptyStreamstream = run(agent, inputs={"question": "Tell me a joke"}, raw=True)
# process yields text chunksfor chunk in process(agent, stream): print(chunk, end="", flush=True)print() # newline after stream completesimport asynciofrom prompty import load_async, run_async, process_async
async def main(): agent = await load_async("chat.prompty") agent.model.options.additionalProperties["stream"] = True
stream = await run_async( agent, inputs={"question": "Tell me a joke"}, raw=True )
async for chunk in process_async(agent, stream): print(chunk, end="", flush=True) print()
asyncio.run(main())import { load, run, process } from "@prompty/core";
const agent = load("chat.prompty");agent.model.options!.additionalProperties!.stream = true;
// run with raw: true returns the PromptyStreamconst stream = await run(agent, { question: "Tell me a joke" }, { raw: true });
// process yields text chunksfor await (const chunk of await process(agent, stream)) { process.stdout.write(String(chunk));}console.log();What the Processor Handles
Section titled “What the Processor Handles”The streaming processor does more than just forward chunks. It handles several edge cases from the OpenAI streaming protocol:
| Scenario | Behavior |
|---|---|
| Content deltas | Each delta.content string is yielded directly to the caller. |
| Tool-call deltas | Argument fragments are accumulated across chunks. A complete ToolCall object is yielded when the stream ends. |
| Refusal | If delta.refusal is present the processor raises a ValueError with the refusal text. |
| Empty / heartbeat chunks | Chunks with no content, tool-call, or refusal data are silently skipped. |
Streaming + Tracing
Section titled “Streaming + Tracing”A common concern with streaming is losing observability — if chunks are consumed lazily, when does the trace fire?
Prompty solves this with the PromptyStream wrapper:
- The executor wraps the raw SDK iterator in a
PromptyStream. - As your application (or the processor) iterates, each chunk is forwarded and appended to an internal accumulator.
- When the iterator is exhausted,
PromptyStreamflushes the complete accumulated response to the active tracer.
iterate chunk 1 → yield + accumulateiterate chunk 2 → yield + accumulateiterate chunk 3 → yield + accumulate ...StopIteration → flush accumulated data to tracer ✓The same applies to AsyncPromptyStream for async iteration.
Streaming + Agent Mode
Section titled “Streaming + Agent Mode”When using execute_agent(), the runtime runs a tool-calling loop:
call the LLM, execute any requested tools, append results, repeat.
Streaming still works inside the agent loop. The executor does not disable streaming — instead it consumes the stream through the processor internally to detect tool calls:
- The LLM streams a response with
tool_callsdeltas. - The processor accumulates fragments and yields a
ToolCall. - The agent loop executes the tool function and appends the result.
- The loop re-calls the LLM (still streaming).
- When the model finally returns a content-only response, those chunks are forwarded to your application as normal text deltas.