Skip to content

Streaming Responses

A prompt that streams responses chunk by chunk — ideal for chat UIs where users see tokens appear in real time. Prompty wraps the raw SDK stream in a tracing-aware PromptyStream so you get full observability without losing any data.


Set stream: true in the model’s additionalProperties — either in the .prompty file or at runtime.

chat.prompty
---
name: streaming-chat
description: A chat prompt with streaming enabled
model:
id: gpt-4o-mini
provider: openai
apiType: chat
connection:
kind: key
apiKey: ${env:OPENAI_API_KEY}
options:
temperature: 0.7
additionalProperties:
stream: true
inputSchema:
properties:
- name: question
kind: string
default: Tell me a joke
template:
format:
kind: jinja2
parser:
kind: prompty
---
system:
You are a helpful assistant.
user:
{{question}}

If your .prompty file doesn’t have streaming enabled, you can toggle it before execution:

from prompty import load
agent = load("chat.prompty")
agent.model.options.additionalProperties["stream"] = True

Use run() with raw=True to get the unprocessed PromptyStream, then pass it to process() which yields text chunks.

from prompty import load, run, process
agent = load("chat.prompty")
agent.model.options.additionalProperties["stream"] = True
# run() with raw=True returns the PromptyStream
stream = run(agent, inputs={"question": "Tell me a joke"}, raw=True)
# process() yields text chunks
for chunk in process(agent, stream):
print(chunk, end="", flush=True)
print() # newline after stream completes

Each chunk is a string — the processor extracts delta.content from the raw API response objects so you don’t handle the wire format yourself.


A common concern with streaming is losing observability — if chunks are consumed lazily, when does the trace fire?

Prompty’s PromptyStream wrapper solves this:

  1. The executor wraps the raw SDK iterator in a PromptyStream.
  2. As you iterate, each chunk is forwarded to your code and appended to an internal accumulator.
  3. When the iterator is exhausted (StopIteration), the wrapper flushes the complete accumulated response to the active tracer.
iterate chunk 1 → yield + accumulate
iterate chunk 2 → yield + accumulate
iterate chunk 3 → yield + accumulate
...
StopIteration → flush accumulated data to tracer ✓

The streaming processor does more than forward raw chunks:

ScenarioBehavior
Content deltasdelta.content strings are yielded directly to the caller
Tool-call deltasArgument fragments are accumulated; a complete ToolCall is yielded when the stream ends
RefusalIf delta.refusal is present, the processor raises a ValueError
Empty / heartbeat chunksChunks with no content or tool-call data are silently skipped

Here’s a self-contained example you can copy and run:

stream-demo.prompty
---
name: stream-demo
model:
id: gpt-4o-mini
provider: openai
apiType: chat
connection:
kind: key
apiKey: ${env:OPENAI_API_KEY}
options:
temperature: 0.9
additionalProperties:
stream: true
inputSchema:
properties:
- name: topic
kind: string
default: space exploration
template:
format:
kind: jinja2
parser:
kind: prompty
---
system:
You are a creative storyteller.
user:
Write a short story about {{topic}}.
stream_app.py
from prompty import load, run, process
agent = load("stream-demo.prompty")
stream = run(agent, inputs={"topic": "a robot learning to paint"}, raw=True)
print("Story: ", end="")
for chunk in process(agent, stream):
print(chunk, end="", flush=True)
print("\n--- Done ---")

  • Streaming concept — architecture details on PromptyStream and AsyncPromptyStream
  • Tracing — how traces capture streaming data