Skip to content

Performance Optimization

Optimizing Prompty applications involves multiple aspects: prompt engineering, runtime configuration, caching strategies, and resource management. This guide provides best practices and techniques to maximize performance.

Reduce token consumption to improve speed and reduce costs:

# ❌ Verbose prompt
---
name: Verbose Summary
---
system:
Please carefully read through the following document and provide a comprehensive, detailed summary that covers all the important points, key findings, major conclusions, and significant details mentioned in the text. Make sure to include background information and context.
user:
{{document}}
# ✅ Concise prompt
---
name: Efficient Summary
---
system:
Summarize the key points from this document in 3-4 sentences.
user:
{{document}}

Request structured responses to reduce parsing overhead:

---
name: Structured Response
---
system:
Respond in JSON format with keys: "answer", "confidence", "reasoning".
user:
{{question}}

Optimize template rendering:

# ✅ Pre-compile templates for reuse
from prompty.renderers import Jinja2Renderer
renderer = Jinja2Renderer()
template = renderer.compile("Hello {{name}}, your order {{order_id}} is {{status}}")
# Reuse compiled template
for data in orders:
result = template.render(data)

Reuse connections to reduce overhead:

import prompty
import prompty.azure
# ✅ Load prompt once, execute multiple times
prompt = prompty.load("customer_support.prompty")
def process_requests(requests):
results = []
for request in requests:
# Reuses existing connection
result = prompty.execute(prompt, inputs=request)
results.append(result)
return results

Process multiple requests efficiently:

import asyncio
import prompty
async def batch_process(prompts_data):
# Create tasks for concurrent execution
tasks = [
prompty.execute_async(
"prompt.prompty",
inputs=data
) for data in prompts_data
]
# Execute concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage
data_batch = [
{"query": "What is AI?"},
{"query": "Explain machine learning"},
{"query": "What is deep learning?"}
]
results = asyncio.run(batch_process(data_batch))

Use streaming for better perceived performance:

def stream_response(prompt_path, inputs):
# Get streaming response
stream = prompty.execute(
prompt_path,
inputs=inputs,
stream=True
)
# Process chunks as they arrive
response_parts = []
for chunk in stream:
response_parts.append(chunk)
# Update UI or process incrementally
yield chunk
return ''.join(response_parts)
# Usage
for chunk in stream_response("chat.prompty", {"message": "Hello"}):
print(chunk, end="", flush=True)

Cache responses for repeated queries:

from functools import lru_cache
import hashlib
import json
class PromptyCache:
def __init__(self, max_size=1000):
self.cache = {}
self.max_size = max_size
def _hash_inputs(self, prompt_path, inputs, config):
"""Create hash key for caching"""
key_data = {
"prompt": prompt_path,
"inputs": inputs,
"config": config
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
def get_or_execute(self, prompt_path, inputs=None, configuration=None):
cache_key = self._hash_inputs(prompt_path, inputs or {}, configuration or {})
if cache_key in self.cache:
return self.cache[cache_key]
# Execute and cache result
result = prompty.execute(
prompt_path,
inputs=inputs,
configuration=configuration
)
# Manage cache size
if len(self.cache) >= self.max_size:
# Remove oldest entry
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = result
return result
# Usage
cache = PromptyCache()
result = cache.get_or_execute("faq.prompty", {"question": "What are your hours?"})

Use Redis for distributed caching:

import redis
import json
import prompty
class RedisPromptyCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis_client = redis.from_url(redis_url)
self.ttl = ttl
def get_or_execute(self, prompt_path, inputs=None, configuration=None):
# Create cache key
cache_data = {
"prompt": prompt_path,
"inputs": inputs or {},
"config": configuration or {}
}
cache_key = f"prompty:{hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()}"
# Try cache first
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute and cache
result = prompty.execute(prompt_path, inputs=inputs, configuration=configuration)
self.redis_client.setex(cache_key, self.ttl, json.dumps(result))
return result

Cache compiled templates:

from prompty.renderers import Jinja2Renderer
class TemplateCache:
def __init__(self):
self.cache = {}
self.renderer = Jinja2Renderer()
def get_compiled_template(self, template_string):
if template_string not in self.cache:
self.cache[template_string] = self.renderer.compile(template_string)
return self.cache[template_string]
# Global template cache
template_cache = TemplateCache()

Select models based on performance requirements:

# ✅ Fast responses for simple tasks
simple_config = {
"type": "azure_openai",
"azure_deployment": "gpt-35-turbo", # Faster than GPT-4
"temperature": 0.1, # Lower temperature for faster responses
"max_tokens": 100 # Limit tokens for speed
}
# ✅ Quality responses for complex tasks
complex_config = {
"type": "azure_openai",
"azure_deployment": "gpt-4",
"temperature": 0.7,
"max_tokens": 2000
}
def get_config_for_task(complexity):
return complex_config if complexity == "high" else simple_config

Fine-tune parameters for performance:

# Speed-optimized configuration
speed_config = {
"temperature": 0.1, # Lower randomness = faster
"max_tokens": 200, # Limit response length
"top_p": 0.9, # Reduce token consideration
"frequency_penalty": 0, # Disable penalties for speed
"presence_penalty": 0
}
# Quality-optimized configuration
quality_config = {
"temperature": 0.7,
"max_tokens": 1000,
"top_p": 1.0,
"frequency_penalty": 0.1,
"presence_penalty": 0.1
}

Manage memory usage in long-running applications:

import gc
import prompty
class PromptyManager:
def __init__(self):
self.loaded_prompts = {}
self.max_loaded = 10
def get_prompt(self, prompt_path):
# Load prompt if not cached
if prompt_path not in self.loaded_prompts:
# Clean up if cache is full
if len(self.loaded_prompts) >= self.max_loaded:
# Remove least recently used
oldest_path = next(iter(self.loaded_prompts))
del self.loaded_prompts[oldest_path]
gc.collect() # Force garbage collection
self.loaded_prompts[prompt_path] = prompty.load(prompt_path)
return self.loaded_prompts[prompt_path]
def execute(self, prompt_path, **kwargs):
prompt = self.get_prompt(prompt_path)
return prompty.execute(prompt, **kwargs)
# Usage
manager = PromptyManager()
result = manager.execute("chat.prompty", inputs={"message": "Hello"})

Process large datasets without loading everything into memory:

def process_large_dataset(data_file, prompt_path, batch_size=100):
"""Process large dataset in batches"""
batch = []
with open(data_file, 'r') as f:
for line in f:
batch.append(json.loads(line))
if len(batch) >= batch_size:
# Process batch
results = process_batch(batch, prompt_path)
yield results
# Clear batch to free memory
batch.clear()
gc.collect()
# Process remaining items
if batch:
yield process_batch(batch, prompt_path)
def process_batch(batch, prompt_path):
return [prompty.execute(prompt_path, inputs=item) for item in batch]

Track performance metrics:

import time
from prompty.tracer import trace
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"total_time": 0,
"avg_response_time": 0,
"cache_hits": 0,
"cache_misses": 0
}
@trace
def execute_with_monitoring(self, prompt_path, **kwargs):
start_time = time.time()
try:
result = prompty.execute(prompt_path, **kwargs)
self.metrics["total_requests"] += 1
execution_time = time.time() - start_time
self.metrics["total_time"] += execution_time
self.metrics["avg_response_time"] = (
self.metrics["total_time"] / self.metrics["total_requests"]
)
return result
except Exception as e:
print(f"Error in execution: {e}")
raise
def get_stats(self):
return self.metrics.copy()
# Usage
monitor = PerformanceMonitor()
result = monitor.execute_with_monitoring("prompt.prompty", inputs={"test": "data"})
print(monitor.get_stats())

Profile your application to identify bottlenecks:

import cProfile
import pstats
import prompty
def profile_prompty_execution():
"""Profile Prompty execution to find bottlenecks"""
profiler = cProfile.Profile()
profiler.enable()
# Your prompty code here
for i in range(10):
result = prompty.execute("test.prompty", inputs={"iteration": i})
profiler.disable()
# Analyze results
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
# Run profiling
profile_prompty_execution()

Test application performance under load:

import asyncio
import time
import statistics
async def load_test(prompt_path, concurrent_requests=10, total_requests=100):
"""Perform load testing on Prompty application"""
async def single_request():
start_time = time.time()
try:
result = await prompty.execute_async(
prompt_path,
inputs={"test": "load testing"}
)
return time.time() - start_time, True
except Exception as e:
return time.time() - start_time, False
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(concurrent_requests)
async def limited_request():
async with semaphore:
return await single_request()
# Execute load test
start_time = time.time()
tasks = [limited_request() for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Analyze results
response_times = [r[0] for r in results]
success_count = sum(1 for r in results if r[1])
print(f"Load Test Results:")
print(f"Total requests: {total_requests}")
print(f"Successful requests: {success_count}")
print(f"Failed requests: {total_requests - success_count}")
print(f"Total time: {total_time:.2f}s")
print(f"Requests per second: {total_requests / total_time:.2f}")
print(f"Average response time: {statistics.mean(response_times):.2f}s")
print(f"95th percentile: {statistics.quantiles(response_times, n=20)[18]:.2f}s")
# Run load test
asyncio.run(load_test("api.prompty", concurrent_requests=5, total_requests=50))

Optimize connection pools for production:

# For Azure OpenAI
import httpx
# Configure HTTP client with connection pooling
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
),
timeout=httpx.Timeout(30.0)
)
# Use in configuration
config = {
"type": "azure_openai",
"azure_endpoint": "https://your-endpoint.openai.azure.com/",
"http_client": http_client
}

Implement rate limiting to prevent API quota exhaustion:

import asyncio
import time
from asyncio import Semaphore
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.semaphore = Semaphore(requests_per_minute)
self.request_times = []
async def acquire(self):
await self.semaphore.acquire()
current_time = time.time()
# Remove requests older than 1 minute
self.request_times = [
t for t in self.request_times
if current_time - t < 60
]
if len(self.request_times) >= self.requests_per_minute:
# Wait until we can make another request
sleep_time = 60 - (current_time - self.request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.request_times.append(current_time)
def release(self):
self.semaphore.release()
# Usage
rate_limiter = RateLimiter(requests_per_minute=30)
async def rate_limited_execute(prompt_path, **kwargs):
await rate_limiter.acquire()
try:
return await prompty.execute_async(prompt_path, **kwargs)
finally:
rate_limiter.release()

Want to Contribute To the Project? - Updated Guidance Coming Soon.