Deployment Best Practices
Deploying Prompty applications to production requires careful consideration of security, scalability, monitoring, and operational concerns. This guide covers best practices for production deployments.
Environment Setup
Section titled “Environment Setup”Production Environment Configuration
Section titled “Production Environment Configuration”Structure your deployment with clear environment separation:
# Project structuremy-prompty-app/├── prompts/│ ├── customer_support.prompty│ ├── content_generation.prompty│ └── data_analysis.prompty├── config/│ ├── prompty.json│ ├── prompty.dev.json│ ├── prompty.staging.json│ └── prompty.prod.json├── environment/│ ├── .env.example│ ├── .env.dev│ ├── .env.staging│ └── .env.prod # Never commit to version control├── app.py├── requirements.txt└── DockerfileEnvironment-Specific Configuration
Section titled “Environment-Specific Configuration”Create environment-specific configuration files:
{ "connections": { "default": { "type": "azure_openai", "azure_endpoint": "${env:AZURE_OPENAI_ENDPOINT}", "api_version": "2024-10-21", "connection_pool_size": 20, "timeout": 30, "retry_count": 3 } }, "defaults": { "temperature": 0.7, "max_tokens": 1000, "top_p": 1.0 }, "security": { "sanitize_logs": true, "mask_sensitive_data": true }, "monitoring": { "enable_tracing": true, "trace_sampling_rate": 0.1 }}Secrets Management
Section titled “Secrets Management”Never hardcode secrets. Use proper secrets management:
# ❌ Don't do thisconfig = { "api_key": "sk-1234567890abcdef" # Hardcoded secret}
# ✅ Use environment variablesimport os
config = { "api_key": os.getenv("AZURE_OPENAI_API_KEY")}
# ✅ Use Azure Key Vaultfrom azure.keyvault.secrets import SecretClientfrom azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()client = SecretClient(vault_url="https://your-vault.vault.azure.net/", credential=credential)api_key = client.get_secret("azure-openai-api-key").valueContainerization
Section titled “Containerization”Dockerfile Best Practices
Section titled “Dockerfile Best Practices”Create an optimized Dockerfile:
# Use official Python slim imageFROM python:3.11-slim
# Set working directoryWORKDIR /app
# Install system dependenciesRUN apt-get update && apt-get install -y \ --no-install-recommends \ curl \ && rm -rf /var/lib/apt/lists/*
# Copy requirements first for better cachingCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Copy application codeCOPY . .
# Create non-root userRUN useradd --create-home --shell /bin/bash promptyUSER prompty
# Set environment variablesENV PYTHONPATH=/appENV PYTHONUNBUFFERED=1
# Health checkHEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1
# Expose portEXPOSE 8000
# Run applicationCMD ["python", "app.py"]Multi-stage Build
Section titled “Multi-stage Build”Optimize image size with multi-stage builds:
# Build stageFROM python:3.11 AS builder
WORKDIR /appCOPY requirements.txt .RUN pip install --user --no-cache-dir -r requirements.txt
# Production stageFROM python:3.11-slim
WORKDIR /app
# Copy dependencies from builder stageCOPY --from=builder /root/.local /root/.local
# Copy applicationCOPY . .
# Make sure scripts in .local are usableENV PATH=/root/.local/bin:$PATH
# Create non-root userRUN useradd --create-home --shell /bin/bash promptyUSER prompty
CMD ["python", "app.py"]Application Architecture
Section titled “Application Architecture”Production-Ready Application Structure
Section titled “Production-Ready Application Structure”import osimport loggingfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport promptyimport prompty.azurefrom prompty.tracer import Tracer, PromptyTracer
# Configure logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)
# Initialize FastAPI appapp = FastAPI( title="Prompty API", description="Production Prompty application", version="1.0.0")
# Configure tracing for productionif os.getenv("ENABLE_TRACING", "false").lower() == "true": tracer = PromptyTracer(output_dir="./traces") Tracer.add("production", tracer.tracer)
class PromptRequest(BaseModel): prompt_name: str inputs: dict configuration: dict = None
class PromptResponse(BaseModel): result: str metadata: dict
# Health check endpoint@app.get("/health")async def health_check(): try: # Test prompty functionality test_result = prompty.execute( prompty.headless( api="chat", content="Test", connection="default" ) ) return {"status": "healthy", "timestamp": "2024-01-15T10:30:00Z"} except Exception as e: logger.error(f"Health check failed: {e}") raise HTTPException(status_code=503, detail="Service unavailable")
@app.post("/execute", response_model=PromptResponse)async def execute_prompt(request: PromptRequest): try: logger.info(f"Executing prompt: {request.prompt_name}")
result = prompty.execute( f"prompts/{request.prompt_name}.prompty", inputs=request.inputs, configuration=request.configuration or {} )
return PromptResponse( result=result, metadata={ "prompt_name": request.prompt_name, "input_count": len(request.inputs) } )
except FileNotFoundError: raise HTTPException(status_code=404, detail="Prompt not found") except Exception as e: logger.error(f"Execution failed: {e}") raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=int(os.getenv("PORT", 8000)), log_level="info" )Security Best Practices
Section titled “Security Best Practices”Input Validation
Section titled “Input Validation”Validate all inputs to prevent injection attacks:
from pydantic import BaseModel, validatorimport re
class SecurePromptRequest(BaseModel): prompt_name: str inputs: dict
@validator('prompt_name') def validate_prompt_name(cls, v): # Only allow alphanumeric characters and underscores if not re.match(r'^[a-zA-Z0-9_]+$', v): raise ValueError('Invalid prompt name') return v
@validator('inputs') def validate_inputs(cls, v): # Prevent excessively large inputs max_input_size = 10000 # characters total_size = sum(len(str(value)) for value in v.values()) if total_size > max_input_size: raise ValueError('Input too large') return vRate Limiting
Section titled “Rate Limiting”Implement rate limiting to prevent abuse:
from slowapi import Limiter, _rate_limit_exceeded_handlerfrom slowapi.util import get_remote_addressfrom slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)app.state.limiter = limiterapp.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/execute")@limiter.limit("10/minute") # 10 requests per minute per IPasync def execute_prompt(request: Request, prompt_request: PromptRequest): # Your execution logic here passAuthentication and Authorization
Section titled “Authentication and Authorization”Implement proper authentication:
from fastapi import Depends, HTTPException, statusfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsimport jwt
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): try: payload = jwt.decode( credentials.credentials, os.getenv("JWT_SECRET"), algorithms=["HS256"] ) return payload except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials" )
@app.post("/execute")async def execute_prompt( request: PromptRequest, user: dict = Depends(verify_token)): # Check user permissions if not user.get("can_execute_prompts"): raise HTTPException(status_code=403, detail="Insufficient permissions")
# Execute prompt logic passMonitoring and Observability
Section titled “Monitoring and Observability”Comprehensive Logging
Section titled “Comprehensive Logging”Implement structured logging:
import structlogfrom pythonjsonlogger import jsonlogger
# Configure structured loggingstructlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True,)
logger = structlog.get_logger()
@app.post("/execute")async def execute_prompt(request: PromptRequest): logger.info( "Prompt execution started", prompt_name=request.prompt_name, input_keys=list(request.inputs.keys()), user_id=get_current_user_id() )
try: result = prompty.execute( f"prompts/{request.prompt_name}.prompty", inputs=request.inputs )
logger.info( "Prompt execution completed", prompt_name=request.prompt_name, result_length=len(str(result)) )
return result
except Exception as e: logger.error( "Prompt execution failed", prompt_name=request.prompt_name, error=str(e), error_type=type(e).__name__ ) raiseMetrics Collection
Section titled “Metrics Collection”Collect application metrics:
from prometheus_client import Counter, Histogram, generate_latestimport time
# Define metricsPROMPT_EXECUTIONS = Counter( 'prompty_executions_total', 'Total number of prompt executions', ['prompt_name', 'status'])
PROMPT_DURATION = Histogram( 'prompty_execution_duration_seconds', 'Time spent executing prompts', ['prompt_name'])
@app.get("/metrics")async def metrics(): return Response(generate_latest(), media_type="text/plain")
@app.post("/execute")async def execute_prompt(request: PromptRequest): start_time = time.time()
try: result = prompty.execute( f"prompts/{request.prompt_name}.prompty", inputs=request.inputs )
PROMPT_EXECUTIONS.labels( prompt_name=request.prompt_name, status="success" ).inc()
return result
except Exception as e: PROMPT_EXECUTIONS.labels( prompt_name=request.prompt_name, status="error" ).inc() raise
finally: PROMPT_DURATION.labels( prompt_name=request.prompt_name ).observe(time.time() - start_time)Deployment Strategies
Section titled “Deployment Strategies”Blue-Green Deployment
Section titled “Blue-Green Deployment”Implement zero-downtime deployments:
# docker-compose.yml for blue-green deploymentversion: '3.8'
services: prompty-blue: build: . environment: - ENVIRONMENT=production - DEPLOYMENT_SLOT=blue labels: - "traefik.enable=true" - "traefik.http.routers.prompty-blue.rule=Host(`api.example.com`) && Headers(`X-Deployment-Slot`, `blue`)"
prompty-green: build: . environment: - ENVIRONMENT=production - DEPLOYMENT_SLOT=green labels: - "traefik.enable=true" - "traefik.http.routers.prompty-green.rule=Host(`api.example.com`) && Headers(`X-Deployment-Slot`, `green`)"
traefik: image: traefik:v2.10 command: - --api.insecure=true - --providers.docker=true ports: - "80:80" - "8080:8080" volumes: - /var/run/docker.sock:/var/run/docker.sockKubernetes Deployment
Section titled “Kubernetes Deployment”Deploy to Kubernetes with proper resource management:
apiVersion: apps/v1kind: Deploymentmetadata: name: prompty-app labels: app: prompty-appspec: replicas: 3 selector: matchLabels: app: prompty-app template: metadata: labels: app: prompty-app spec: containers: - name: prompty-app image: your-registry/prompty-app:latest ports: - containerPort: 8000 env: - name: AZURE_OPENAI_ENDPOINT valueFrom: secretKeyRef: name: prompty-secrets key: azure-openai-endpoint - name: AZURE_OPENAI_API_KEY valueFrom: secretKeyRef: name: prompty-secrets key: azure-openai-api-key resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 5 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: prompty-servicespec: selector: app: prompty-app ports: - port: 80 targetPort: 8000 type: LoadBalancerCI/CD Pipeline
Section titled “CI/CD Pipeline”GitHub Actions Workflow
Section titled “GitHub Actions Workflow”Automate testing and deployment:
name: Deploy Prompty App
on: push: branches: [main] pull_request: branches: [main]
jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2
- name: Set up Python uses: actions/setup-python@v2 with: python-version: '3.11'
- name: Install dependencies run: | pip install -r requirements.txt pip install pytest
- name: Run tests run: pytest tests/ env: AZURE_OPENAI_ENDPOINT: ${{ secrets.TEST_AZURE_OPENAI_ENDPOINT }} AZURE_OPENAI_API_KEY: ${{ secrets.TEST_AZURE_OPENAI_API_KEY }}
deploy: needs: test runs-on: ubuntu-latest if: github.ref == 'refs/heads/main'
steps: - uses: actions/checkout@v2
- name: Build and push Docker image run: | docker build -t ${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }} . docker push ${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }}
- name: Deploy to Kubernetes run: | kubectl set image deployment/prompty-app prompty-app=${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }} kubectl rollout status deployment/prompty-appError Handling and Recovery
Section titled “Error Handling and Recovery”Resilient Error Handling
Section titled “Resilient Error Handling”Implement comprehensive error handling:
import asynciofrom tenacity import retry, stop_after_attempt, wait_exponential
class PromptyService: def __init__(self): self.circuit_breaker = CircuitBreaker()
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def execute_with_retry(self, prompt_path, inputs, **kwargs): try: if self.circuit_breaker.is_open(): raise Exception("Circuit breaker is open")
result = await prompty.execute_async( prompt_path, inputs=inputs, **kwargs )
self.circuit_breaker.record_success() return result
except Exception as e: self.circuit_breaker.record_failure() logger.error(f"Prompt execution failed: {e}") raise
class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = 0 self.state = "closed" # closed, open, half-open
def is_open(self): if self.state == "open": if time.time() - self.last_failure_time > self.timeout: self.state = "half-open" return False return True return False
def record_success(self): self.failure_count = 0 self.state = "closed"
def record_failure(self): self.failure_count += 1 self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold: self.state = "open"Performance Monitoring
Section titled “Performance Monitoring”Application Performance Monitoring
Section titled “Application Performance Monitoring”Monitor key performance indicators:
import psutilimport timefrom dataclasses import dataclass
@dataclassclass PerformanceMetrics: cpu_percent: float memory_percent: float active_connections: int response_time_avg: float request_rate: float
class PerformanceMonitor: def __init__(self): self.request_times = [] self.start_time = time.time() self.request_count = 0
def record_request(self, response_time): self.request_times.append(response_time) self.request_count += 1
# Keep only last 100 requests for memory efficiency if len(self.request_times) > 100: self.request_times.pop(0)
def get_metrics(self) -> PerformanceMetrics: cpu_percent = psutil.cpu_percent() memory_percent = psutil.virtual_memory().percent
avg_response_time = ( sum(self.request_times) / len(self.request_times) if self.request_times else 0 )
uptime = time.time() - self.start_time request_rate = self.request_count / uptime if uptime > 0 else 0
return PerformanceMetrics( cpu_percent=cpu_percent, memory_percent=memory_percent, active_connections=len(self.request_times), response_time_avg=avg_response_time, request_rate=request_rate )
# Global performance monitorperf_monitor = PerformanceMonitor()
@app.middleware("http")async def add_performance_monitoring(request: Request, call_next): start_time = time.time() response = await call_next(request) process_time = time.time() - start_time
perf_monitor.record_request(process_time)
return response
@app.get("/metrics/performance")async def get_performance_metrics(): return perf_monitor.get_metrics()Best Practices Summary
Section titled “Best Practices Summary”Next Steps
Section titled “Next Steps”- Learn about Performance Optimization for scaling
- Explore Observability & Tracing for monitoring
- Check out Configuration for environment management
Want to Contribute To the Project? - Updated Guidance Coming Soon.