Skip to content

§8 Processing

Processing extracts the final result from raw LLM API responses. Every pipeline invocation that calls an executor MUST subsequently call the processor.

Public API:

  • process(agent, response) → result
  • process_async(agent, response) → result

Both variants MUST emit a process trace span.

function process_chat(agent, response) → result:
choice = response.choices[0]
message = choice.message
// Tool calls take priority
if message.tool_calls and len(message.tool_calls) > 0:
return [
ToolCall(
id: tc.id,
name: tc.function.name,
arguments: tc.function.arguments
)
for tc in message.tool_calls
]
// Normal content
content = message.content or ""
// Structured output: JSON-parse if outputs is defined
if agent.outputs and content:
try:
return json_parse(content)
except ParseError:
return content // fallback to raw string
return content

The processor MUST check for tool calls before extracting content. When tool calls are present, the content field MUST be ignored and the full list of ToolCall objects returned.

function process_embedding(agent, response) → result:
data = response.data
if len(data) == 0:
raise ValueError("Empty embedding response")
if len(data) == 1:
return data[0].embedding // single float[]
return [item.embedding for item in data] // list of float[]

Implementations MUST return a single vector (not a list of one) when exactly one embedding is present.

function process_image(agent, response) → result:
data = response.data
if len(data) == 0:
raise ValueError("Empty image response")
results = []
for item in data:
if item.url:
results.append(item.url)
elif item.b64_json:
results.append(item.b64_json)
if len(results) == 1:
return results[0] // single string
return results // list of strings

url MUST be preferred over b64_json when both are present on the same item.

function process_responses(agent, response) → result:
// Error check
if response.error:
raise ValueError(response.error.message)
// Function calls
function_calls = [
item for item in response.output
if item.type == "function_call"
]
if function_calls:
return [
ToolCall(
id: fc.call_id,
name: fc.name,
arguments: fc.arguments
)
for fc in function_calls
]
// Text output
output_text = response.output_text // convenience accessor
// Structured output
if agent.outputs and output_text:
try:
return json_parse(output_text)
except ParseError:
return output_text
return output_text
function process_anthropic(agent, response) → result:
// Tool use blocks take priority
tool_blocks = [
b for b in response.content
if b.type == "tool_use"
]
if tool_blocks:
return [
ToolCall(
id: b.id,
name: b.name,
arguments: json_serialize(b.input)
)
for b in tool_blocks
]
// Text content
text_blocks = [
b for b in response.content
if b.type == "text"
]
content = text_blocks[0].text if text_blocks else ""
// Structured output
if agent.outputs and content:
try:
return json_parse(content)
except ParseError:
return content
return content

Streaming responses MUST be processed incrementally. The processor consumes chunks from a PromptyStream (§10 Streaming) and yields extracted deltas:

function process_stream(response_stream) → generator:
// response_stream is a PromptyStream wrapping the raw SDK iterator
tool_calls = {} // accumulate partial tool calls by index
for chunk in response_stream:
delta = chunk.choices[0].delta if chunk.choices else null
if delta is null:
continue
// Content chunks
if delta.content:
yield delta.content
// Tool call chunks (accumulated by index)
if delta.tool_calls:
for tc_chunk in delta.tool_calls:
idx = tc_chunk.index
if idx not in tool_calls:
tool_calls[idx] = ToolCall(
id: tc_chunk.id,
name: tc_chunk.function.name,
arguments: ""
)
if tc_chunk.function.arguments:
tool_calls[idx].arguments += tc_chunk.function.arguments
// Refusal
if delta.refusal:
raise ValueError("Model refused: " + delta.refusal)
// After stream exhaustion, yield accumulated tool calls
if tool_calls:
for idx in sorted(tool_calls.keys()):
yield tool_calls[idx]

All processors MUST return tool calls using this structure:

ToolCall:
id: string // unique identifier assigned by the API
name: string // function name to invoke
arguments: string // JSON-encoded arguments string

The arguments field MUST always be a JSON string, regardless of provider. Callers MUST json_parse(tool_call.arguments) to obtain the argument dict.