Skip to content

Deployment Best Practices

Deploying Prompty applications to production requires careful consideration of security, scalability, monitoring, and operational concerns. This guide covers best practices for production deployments.

Structure your deployment with clear environment separation:

Terminal window
# Project structure
my-prompty-app/
├── prompts/
├── customer_support.prompty
├── content_generation.prompty
└── data_analysis.prompty
├── config/
├── prompty.json
├── prompty.dev.json
├── prompty.staging.json
└── prompty.prod.json
├── environment/
├── .env.example
├── .env.dev
├── .env.staging
└── .env.prod # Never commit to version control
├── app.py
├── requirements.txt
└── Dockerfile

Create environment-specific configuration files:

config/prompty.prod.json
{
"connections": {
"default": {
"type": "azure_openai",
"azure_endpoint": "${env:AZURE_OPENAI_ENDPOINT}",
"api_version": "2024-10-21",
"connection_pool_size": 20,
"timeout": 30,
"retry_count": 3
}
},
"defaults": {
"temperature": 0.7,
"max_tokens": 1000,
"top_p": 1.0
},
"security": {
"sanitize_logs": true,
"mask_sensitive_data": true
},
"monitoring": {
"enable_tracing": true,
"trace_sampling_rate": 0.1
}
}

Never hardcode secrets. Use proper secrets management:

# ❌ Don't do this
config = {
"api_key": "sk-1234567890abcdef" # Hardcoded secret
}
# ✅ Use environment variables
import os
config = {
"api_key": os.getenv("AZURE_OPENAI_API_KEY")
}
# ✅ Use Azure Key Vault
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = SecretClient(vault_url="https://your-vault.vault.azure.net/", credential=credential)
api_key = client.get_secret("azure-openai-api-key").value

Create an optimized Dockerfile:

# Use official Python slim image
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
--no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash prompty
USER prompty
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expose port
EXPOSE 8000
# Run application
CMD ["python", "app.py"]

Optimize image size with multi-stage builds:

# Build stage
FROM python:3.11 AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.11-slim
WORKDIR /app
# Copy dependencies from builder stage
COPY --from=builder /root/.local /root/.local
# Copy application
COPY . .
# Make sure scripts in .local are usable
ENV PATH=/root/.local/bin:$PATH
# Create non-root user
RUN useradd --create-home --shell /bin/bash prompty
USER prompty
CMD ["python", "app.py"]
app.py
import os
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import prompty
import prompty.azure
from prompty.tracer import Tracer, PromptyTracer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Prompty API",
description="Production Prompty application",
version="1.0.0"
)
# Configure tracing for production
if os.getenv("ENABLE_TRACING", "false").lower() == "true":
tracer = PromptyTracer(output_dir="./traces")
Tracer.add("production", tracer.tracer)
class PromptRequest(BaseModel):
prompt_name: str
inputs: dict
configuration: dict = None
class PromptResponse(BaseModel):
result: str
metadata: dict
# Health check endpoint
@app.get("/health")
async def health_check():
try:
# Test prompty functionality
test_result = prompty.execute(
prompty.headless(
api="chat",
content="Test",
connection="default"
)
)
return {"status": "healthy", "timestamp": "2024-01-15T10:30:00Z"}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service unavailable")
@app.post("/execute", response_model=PromptResponse)
async def execute_prompt(request: PromptRequest):
try:
logger.info(f"Executing prompt: {request.prompt_name}")
result = prompty.execute(
f"prompts/{request.prompt_name}.prompty",
inputs=request.inputs,
configuration=request.configuration or {}
)
return PromptResponse(
result=result,
metadata={
"prompt_name": request.prompt_name,
"input_count": len(request.inputs)
}
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Prompt not found")
except Exception as e:
logger.error(f"Execution failed: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=int(os.getenv("PORT", 8000)),
log_level="info"
)

Validate all inputs to prevent injection attacks:

from pydantic import BaseModel, validator
import re
class SecurePromptRequest(BaseModel):
prompt_name: str
inputs: dict
@validator('prompt_name')
def validate_prompt_name(cls, v):
# Only allow alphanumeric characters and underscores
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Invalid prompt name')
return v
@validator('inputs')
def validate_inputs(cls, v):
# Prevent excessively large inputs
max_input_size = 10000 # characters
total_size = sum(len(str(value)) for value in v.values())
if total_size > max_input_size:
raise ValueError('Input too large')
return v

Implement rate limiting to prevent abuse:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/execute")
@limiter.limit("10/minute") # 10 requests per minute per IP
async def execute_prompt(request: Request, prompt_request: PromptRequest):
# Your execution logic here
pass

Implement proper authentication:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(
credentials.credentials,
os.getenv("JWT_SECRET"),
algorithms=["HS256"]
)
return payload
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
@app.post("/execute")
async def execute_prompt(
request: PromptRequest,
user: dict = Depends(verify_token)
):
# Check user permissions
if not user.get("can_execute_prompts"):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Execute prompt logic
pass

Implement structured logging:

import structlog
from pythonjsonlogger import jsonlogger
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
@app.post("/execute")
async def execute_prompt(request: PromptRequest):
logger.info(
"Prompt execution started",
prompt_name=request.prompt_name,
input_keys=list(request.inputs.keys()),
user_id=get_current_user_id()
)
try:
result = prompty.execute(
f"prompts/{request.prompt_name}.prompty",
inputs=request.inputs
)
logger.info(
"Prompt execution completed",
prompt_name=request.prompt_name,
result_length=len(str(result))
)
return result
except Exception as e:
logger.error(
"Prompt execution failed",
prompt_name=request.prompt_name,
error=str(e),
error_type=type(e).__name__
)
raise

Collect application metrics:

from prometheus_client import Counter, Histogram, generate_latest
import time
# Define metrics
PROMPT_EXECUTIONS = Counter(
'prompty_executions_total',
'Total number of prompt executions',
['prompt_name', 'status']
)
PROMPT_DURATION = Histogram(
'prompty_execution_duration_seconds',
'Time spent executing prompts',
['prompt_name']
)
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
@app.post("/execute")
async def execute_prompt(request: PromptRequest):
start_time = time.time()
try:
result = prompty.execute(
f"prompts/{request.prompt_name}.prompty",
inputs=request.inputs
)
PROMPT_EXECUTIONS.labels(
prompt_name=request.prompt_name,
status="success"
).inc()
return result
except Exception as e:
PROMPT_EXECUTIONS.labels(
prompt_name=request.prompt_name,
status="error"
).inc()
raise
finally:
PROMPT_DURATION.labels(
prompt_name=request.prompt_name
).observe(time.time() - start_time)

Implement zero-downtime deployments:

# docker-compose.yml for blue-green deployment
version: '3.8'
services:
prompty-blue:
build: .
environment:
- ENVIRONMENT=production
- DEPLOYMENT_SLOT=blue
labels:
- "traefik.enable=true"
- "traefik.http.routers.prompty-blue.rule=Host(`api.example.com`) && Headers(`X-Deployment-Slot`, `blue`)"
prompty-green:
build: .
environment:
- ENVIRONMENT=production
- DEPLOYMENT_SLOT=green
labels:
- "traefik.enable=true"
- "traefik.http.routers.prompty-green.rule=Host(`api.example.com`) && Headers(`X-Deployment-Slot`, `green`)"
traefik:
image: traefik:v2.10
command:
- --api.insecure=true
- --providers.docker=true
ports:
- "80:80"
- "8080:8080"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

Deploy to Kubernetes with proper resource management:

k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: prompty-app
labels:
app: prompty-app
spec:
replicas: 3
selector:
matchLabels:
app: prompty-app
template:
metadata:
labels:
app: prompty-app
spec:
containers:
- name: prompty-app
image: your-registry/prompty-app:latest
ports:
- containerPort: 8000
env:
- name: AZURE_OPENAI_ENDPOINT
valueFrom:
secretKeyRef:
name: prompty-secrets
key: azure-openai-endpoint
- name: AZURE_OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: prompty-secrets
key: azure-openai-api-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: prompty-service
spec:
selector:
app: prompty-app
ports:
- port: 80
targetPort: 8000
type: LoadBalancer

Automate testing and deployment:

.github/workflows/deploy.yml
name: Deploy Prompty App
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest
- name: Run tests
run: pytest tests/
env:
AZURE_OPENAI_ENDPOINT: ${{ secrets.TEST_AZURE_OPENAI_ENDPOINT }}
AZURE_OPENAI_API_KEY: ${{ secrets.TEST_AZURE_OPENAI_API_KEY }}
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v2
- name: Build and push Docker image
run: |
docker build -t ${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }} .
docker push ${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }}
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/prompty-app prompty-app=${{ secrets.REGISTRY_URL }}/prompty-app:${{ github.sha }}
kubectl rollout status deployment/prompty-app

Implement comprehensive error handling:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
class PromptyService:
def __init__(self):
self.circuit_breaker = CircuitBreaker()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def execute_with_retry(self, prompt_path, inputs, **kwargs):
try:
if self.circuit_breaker.is_open():
raise Exception("Circuit breaker is open")
result = await prompty.execute_async(
prompt_path,
inputs=inputs,
**kwargs
)
self.circuit_breaker.record_success()
return result
except Exception as e:
self.circuit_breaker.record_failure()
logger.error(f"Prompt execution failed: {e}")
raise
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
def is_open(self):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
return False
return True
return False
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"

Monitor key performance indicators:

import psutil
import time
from dataclasses import dataclass
@dataclass
class PerformanceMetrics:
cpu_percent: float
memory_percent: float
active_connections: int
response_time_avg: float
request_rate: float
class PerformanceMonitor:
def __init__(self):
self.request_times = []
self.start_time = time.time()
self.request_count = 0
def record_request(self, response_time):
self.request_times.append(response_time)
self.request_count += 1
# Keep only last 100 requests for memory efficiency
if len(self.request_times) > 100:
self.request_times.pop(0)
def get_metrics(self) -> PerformanceMetrics:
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
avg_response_time = (
sum(self.request_times) / len(self.request_times)
if self.request_times else 0
)
uptime = time.time() - self.start_time
request_rate = self.request_count / uptime if uptime > 0 else 0
return PerformanceMetrics(
cpu_percent=cpu_percent,
memory_percent=memory_percent,
active_connections=len(self.request_times),
response_time_avg=avg_response_time,
request_rate=request_rate
)
# Global performance monitor
perf_monitor = PerformanceMonitor()
@app.middleware("http")
async def add_performance_monitoring(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
perf_monitor.record_request(process_time)
return response
@app.get("/metrics/performance")
async def get_performance_metrics():
return perf_monitor.get_metrics()

Want to Contribute To the Project? - Updated Guidance Coming Soon.