Performance Optimization
Optimizing Prompty applications involves multiple aspects: prompt engineering, runtime configuration, caching strategies, and resource management. This guide provides best practices and techniques to maximize performance.
Prompt Engineering for Performance
Section titled “Prompt Engineering for Performance”Optimize Token Usage
Section titled “Optimize Token Usage”Reduce token consumption to improve speed and reduce costs:
# ❌ Verbose prompt---name: Verbose Summary---system:Please carefully read through the following document and provide a comprehensive, detailed summary that covers all the important points, key findings, major conclusions, and significant details mentioned in the text. Make sure to include background information and context.
user:{{document}}
# ✅ Concise prompt---name: Efficient Summary---system:Summarize the key points from this document in 3-4 sentences.
user:{{document}}Use Structured Outputs
Section titled “Use Structured Outputs”Request structured responses to reduce parsing overhead:
---name: Structured Response---system:Respond in JSON format with keys: "answer", "confidence", "reasoning".
user:{{question}}Template Optimization
Section titled “Template Optimization”Optimize template rendering:
# ✅ Pre-compile templates for reusefrom prompty.renderers import Jinja2Renderer
renderer = Jinja2Renderer()template = renderer.compile("Hello {{name}}, your order {{order_id}} is {{status}}")
# Reuse compiled templatefor data in orders: result = template.render(data)Runtime Optimization
Section titled “Runtime Optimization”Connection Pooling
Section titled “Connection Pooling”Reuse connections to reduce overhead:
import promptyimport prompty.azure
# ✅ Load prompt once, execute multiple timesprompt = prompty.load("customer_support.prompty")
def process_requests(requests): results = [] for request in requests: # Reuses existing connection result = prompty.execute(prompt, inputs=request) results.append(result) return resultsBatch Processing
Section titled “Batch Processing”Process multiple requests efficiently:
import asyncioimport prompty
async def batch_process(prompts_data): # Create tasks for concurrent execution tasks = [ prompty.execute_async( "prompt.prompty", inputs=data ) for data in prompts_data ]
# Execute concurrently results = await asyncio.gather(*tasks, return_exceptions=True) return results
# Usagedata_batch = [ {"query": "What is AI?"}, {"query": "Explain machine learning"}, {"query": "What is deep learning?"}]
results = asyncio.run(batch_process(data_batch))Streaming for Large Responses
Section titled “Streaming for Large Responses”Use streaming for better perceived performance:
def stream_response(prompt_path, inputs): # Get streaming response stream = prompty.execute( prompt_path, inputs=inputs, stream=True )
# Process chunks as they arrive response_parts = [] for chunk in stream: response_parts.append(chunk) # Update UI or process incrementally yield chunk
return ''.join(response_parts)
# Usagefor chunk in stream_response("chat.prompty", {"message": "Hello"}): print(chunk, end="", flush=True)Caching Strategies
Section titled “Caching Strategies”Response Caching
Section titled “Response Caching”Cache responses for repeated queries:
from functools import lru_cacheimport hashlibimport json
class PromptyCache: def __init__(self, max_size=1000): self.cache = {} self.max_size = max_size
def _hash_inputs(self, prompt_path, inputs, config): """Create hash key for caching""" key_data = { "prompt": prompt_path, "inputs": inputs, "config": config } key_str = json.dumps(key_data, sort_keys=True) return hashlib.md5(key_str.encode()).hexdigest()
def get_or_execute(self, prompt_path, inputs=None, configuration=None): cache_key = self._hash_inputs(prompt_path, inputs or {}, configuration or {})
if cache_key in self.cache: return self.cache[cache_key]
# Execute and cache result result = prompty.execute( prompt_path, inputs=inputs, configuration=configuration )
# Manage cache size if len(self.cache) >= self.max_size: # Remove oldest entry oldest_key = next(iter(self.cache)) del self.cache[oldest_key]
self.cache[cache_key] = result return result
# Usagecache = PromptyCache()result = cache.get_or_execute("faq.prompty", {"question": "What are your hours?"})Redis Caching
Section titled “Redis Caching”Use Redis for distributed caching:
import redisimport jsonimport prompty
class RedisPromptyCache: def __init__(self, redis_url="redis://localhost:6379", ttl=3600): self.redis_client = redis.from_url(redis_url) self.ttl = ttl
def get_or_execute(self, prompt_path, inputs=None, configuration=None): # Create cache key cache_data = { "prompt": prompt_path, "inputs": inputs or {}, "config": configuration or {} } cache_key = f"prompty:{hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()}"
# Try cache first cached_result = self.redis_client.get(cache_key) if cached_result: return json.loads(cached_result)
# Execute and cache result = prompty.execute(prompt_path, inputs=inputs, configuration=configuration) self.redis_client.setex(cache_key, self.ttl, json.dumps(result)) return resultTemplate Caching
Section titled “Template Caching”Cache compiled templates:
from prompty.renderers import Jinja2Renderer
class TemplateCache: def __init__(self): self.cache = {} self.renderer = Jinja2Renderer()
def get_compiled_template(self, template_string): if template_string not in self.cache: self.cache[template_string] = self.renderer.compile(template_string) return self.cache[template_string]
# Global template cachetemplate_cache = TemplateCache()Model Configuration Optimization
Section titled “Model Configuration Optimization”Choose Appropriate Models
Section titled “Choose Appropriate Models”Select models based on performance requirements:
# ✅ Fast responses for simple taskssimple_config = { "type": "azure_openai", "azure_deployment": "gpt-35-turbo", # Faster than GPT-4 "temperature": 0.1, # Lower temperature for faster responses "max_tokens": 100 # Limit tokens for speed}
# ✅ Quality responses for complex taskscomplex_config = { "type": "azure_openai", "azure_deployment": "gpt-4", "temperature": 0.7, "max_tokens": 2000}
def get_config_for_task(complexity): return complex_config if complexity == "high" else simple_configOptimize Model Parameters
Section titled “Optimize Model Parameters”Fine-tune parameters for performance:
# Speed-optimized configurationspeed_config = { "temperature": 0.1, # Lower randomness = faster "max_tokens": 200, # Limit response length "top_p": 0.9, # Reduce token consideration "frequency_penalty": 0, # Disable penalties for speed "presence_penalty": 0}
# Quality-optimized configurationquality_config = { "temperature": 0.7, "max_tokens": 1000, "top_p": 1.0, "frequency_penalty": 0.1, "presence_penalty": 0.1}Memory Optimization
Section titled “Memory Optimization”Efficient Resource Management
Section titled “Efficient Resource Management”Manage memory usage in long-running applications:
import gcimport prompty
class PromptyManager: def __init__(self): self.loaded_prompts = {} self.max_loaded = 10
def get_prompt(self, prompt_path): # Load prompt if not cached if prompt_path not in self.loaded_prompts: # Clean up if cache is full if len(self.loaded_prompts) >= self.max_loaded: # Remove least recently used oldest_path = next(iter(self.loaded_prompts)) del self.loaded_prompts[oldest_path] gc.collect() # Force garbage collection
self.loaded_prompts[prompt_path] = prompty.load(prompt_path)
return self.loaded_prompts[prompt_path]
def execute(self, prompt_path, **kwargs): prompt = self.get_prompt(prompt_path) return prompty.execute(prompt, **kwargs)
# Usagemanager = PromptyManager()result = manager.execute("chat.prompty", inputs={"message": "Hello"})Stream Processing for Large Datasets
Section titled “Stream Processing for Large Datasets”Process large datasets without loading everything into memory:
def process_large_dataset(data_file, prompt_path, batch_size=100): """Process large dataset in batches""" batch = []
with open(data_file, 'r') as f: for line in f: batch.append(json.loads(line))
if len(batch) >= batch_size: # Process batch results = process_batch(batch, prompt_path) yield results
# Clear batch to free memory batch.clear() gc.collect()
# Process remaining items if batch: yield process_batch(batch, prompt_path)
def process_batch(batch, prompt_path): return [prompty.execute(prompt_path, inputs=item) for item in batch]Monitoring and Profiling
Section titled “Monitoring and Profiling”Performance Monitoring
Section titled “Performance Monitoring”Track performance metrics:
import timefrom prompty.tracer import trace
class PerformanceMonitor: def __init__(self): self.metrics = { "total_requests": 0, "total_time": 0, "avg_response_time": 0, "cache_hits": 0, "cache_misses": 0 }
@trace def execute_with_monitoring(self, prompt_path, **kwargs): start_time = time.time()
try: result = prompty.execute(prompt_path, **kwargs) self.metrics["total_requests"] += 1
execution_time = time.time() - start_time self.metrics["total_time"] += execution_time self.metrics["avg_response_time"] = ( self.metrics["total_time"] / self.metrics["total_requests"] )
return result
except Exception as e: print(f"Error in execution: {e}") raise
def get_stats(self): return self.metrics.copy()
# Usagemonitor = PerformanceMonitor()result = monitor.execute_with_monitoring("prompt.prompty", inputs={"test": "data"})print(monitor.get_stats())Profiling
Section titled “Profiling”Profile your application to identify bottlenecks:
import cProfileimport pstatsimport prompty
def profile_prompty_execution(): """Profile Prompty execution to find bottlenecks""" profiler = cProfile.Profile()
profiler.enable()
# Your prompty code here for i in range(10): result = prompty.execute("test.prompty", inputs={"iteration": i})
profiler.disable()
# Analyze results stats = pstats.Stats(profiler) stats.sort_stats('cumulative') stats.print_stats(20) # Top 20 functions
# Run profilingprofile_prompty_execution()Load Testing
Section titled “Load Testing”Concurrent Load Testing
Section titled “Concurrent Load Testing”Test application performance under load:
import asyncioimport timeimport statistics
async def load_test(prompt_path, concurrent_requests=10, total_requests=100): """Perform load testing on Prompty application"""
async def single_request(): start_time = time.time() try: result = await prompty.execute_async( prompt_path, inputs={"test": "load testing"} ) return time.time() - start_time, True except Exception as e: return time.time() - start_time, False
# Create semaphore to limit concurrent requests semaphore = asyncio.Semaphore(concurrent_requests)
async def limited_request(): async with semaphore: return await single_request()
# Execute load test start_time = time.time() tasks = [limited_request() for _ in range(total_requests)] results = await asyncio.gather(*tasks) total_time = time.time() - start_time
# Analyze results response_times = [r[0] for r in results] success_count = sum(1 for r in results if r[1])
print(f"Load Test Results:") print(f"Total requests: {total_requests}") print(f"Successful requests: {success_count}") print(f"Failed requests: {total_requests - success_count}") print(f"Total time: {total_time:.2f}s") print(f"Requests per second: {total_requests / total_time:.2f}") print(f"Average response time: {statistics.mean(response_times):.2f}s") print(f"95th percentile: {statistics.quantiles(response_times, n=20)[18]:.2f}s")
# Run load testasyncio.run(load_test("api.prompty", concurrent_requests=5, total_requests=50))Production Optimization
Section titled “Production Optimization”Connection Pool Configuration
Section titled “Connection Pool Configuration”Optimize connection pools for production:
# For Azure OpenAIimport httpx
# Configure HTTP client with connection poolinghttp_client = httpx.AsyncClient( limits=httpx.Limits( max_keepalive_connections=20, max_connections=100, keepalive_expiry=30.0 ), timeout=httpx.Timeout(30.0))
# Use in configurationconfig = { "type": "azure_openai", "azure_endpoint": "https://your-endpoint.openai.azure.com/", "http_client": http_client}Rate Limiting
Section titled “Rate Limiting”Implement rate limiting to prevent API quota exhaustion:
import asyncioimport timefrom asyncio import Semaphore
class RateLimiter: def __init__(self, requests_per_minute=60): self.requests_per_minute = requests_per_minute self.semaphore = Semaphore(requests_per_minute) self.request_times = []
async def acquire(self): await self.semaphore.acquire()
current_time = time.time() # Remove requests older than 1 minute self.request_times = [ t for t in self.request_times if current_time - t < 60 ]
if len(self.request_times) >= self.requests_per_minute: # Wait until we can make another request sleep_time = 60 - (current_time - self.request_times[0]) if sleep_time > 0: await asyncio.sleep(sleep_time)
self.request_times.append(current_time)
def release(self): self.semaphore.release()
# Usagerate_limiter = RateLimiter(requests_per_minute=30)
async def rate_limited_execute(prompt_path, **kwargs): await rate_limiter.acquire() try: return await prompty.execute_async(prompt_path, **kwargs) finally: rate_limiter.release()Best Practices Summary
Section titled “Best Practices Summary”Next Steps
Section titled “Next Steps”- Learn about Observability & Tracing for monitoring
- Explore Configuration for optimization settings
- Check out CLI Usage for performance testing
Want to Contribute To the Project? - Updated Guidance Coming Soon.