Skip to content

§10 Streaming

Streaming wraps raw SDK iterators to enable tracing and item accumulation.

PromptyStream:
// Constructor
constructor(raw_iterator):
self.raw = raw_iterator
self.items = []
// Sync iteration
__iter__():
for item in self.raw:
self.items.append(item)
yield item
// On exhaustion:
TRACE: emit "PromptyStream" span with result = self.items
// Async iteration
__aiter__():
async for item in self.raw:
self.items.append(item)
yield item
// On exhaustion:
TRACE: emit "PromptyStream" span with result = self.items

Requirements:

  • Implementations MUST support async iteration (AsyncIterator / AsyncIterable or language equivalent).
  • Implementations SHOULD support sync iteration when the language allows it.
  • Implementations MUST accumulate all items for tracing.
  • Implementations MUST flush accumulated items to the tracer on exhaustion (not on each individual item).

The processor wraps the raw SDK stream in a PromptyStream, then yields processed content:

async function process_stream(agent, raw_stream) → async generator:
prompty_stream = PromptyStream(raw_stream)
async for chunk in prompty_stream:
// Extract delta content / tool calls from chunk
yield processed_delta

The PromptyStream wrapper MUST be the innermost wrapper around the SDK iterator. Any processing logic operates on top of it.

If a streaming chunk contains a refusal field (OpenAI-specific), implementations MUST raise a ValueError with the refusal message immediately upon encountering the chunk. The stream MUST NOT continue after a refusal is detected.