Python SDK
FastAPI Integration
Complete FastAPI integration with Pydantic v2 models, dependency injection, lifespan context management, and production-ready patterns.
This guide uses modern FastAPI patterns including lifespan context managers, Pydantic v2 Settings, and modular APIRouter architecture.
Project Structure
my_fastapi_app/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app factory
│ ├── config.py # Pydantic Settings
│ ├── dependencies.py # Shared dependencies
│ ├── lifespan.py # Lifespan context manager
│ ├── middleware.py # Custom middleware
│ ├── exceptions.py # Custom exceptions & handlers
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── messages.py # Message endpoints
│ │ ├── webhooks.py # Webhook handlers
│ │ └── health.py # Health checks
│ ├── services/
│ │ ├── __init__.py
│ │ └── sent_service.py # Sent client service
│ ├── models/
│ │ ├── __init__.py
│ │ └── schemas.py # Pydantic models
│ └── utils/
│ ├── __init__.py
│ └── logging.py # Logging configuration
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Pytest fixtures
│ ├── test_messages.py
│ └── test_webhooks.py
├── .env
├── pyproject.toml
└── requirements.txtInstallation
pip install sentdm fastapi uvicorn pydantic-settings slowapi
# For improved async HTTP performance
pip install sentdm[aiohttp]
# Development dependencies
pip install pytest pytest-asyncio httpxConfiguration
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, field_validator
from functools import lru_cache
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
app_name: str = Field(default="Sent DM API", alias="APP_NAME")
app_version: str = Field(default="1.0.0", alias="APP_VERSION")
debug: bool = Field(default=False, alias="DEBUG")
sent_dm_api_key: str = Field(alias="SENT_DM_API_KEY")
sent_dm_webhook_secret: str | None = Field(default=None, alias="SENT_DM_WEBHOOK_SECRET")
sent_dm_base_url: str | None = Field(default=None, alias="SENT_DM_BASE_URL")
rate_limit: str = Field(default="100/minute", alias="RATE_LIMIT")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
@field_validator("sent_dm_api_key")
@classmethod
def validate_api_key(cls, v: str) -> str:
if not v or len(v) < 10:
raise ValueError("SENT_DM_API_KEY must be a valid API key")
return v
@lru_cache
def get_settings() -> Settings:
return Settings()# .env
SENT_DM_API_KEY=your_api_key_here
SENT_DM_WEBHOOK_SECRET=your_webhook_secret
DEBUG=false
LOG_LEVEL=INFO
RATE_LIMIT=100/minuteAsync Client Manager with Lifespan
# app/lifespan.py
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI
from sent_dm import AsyncSentDm
from app.config import get_settings
class SentClientManager:
def __init__(self) -> None:
self._client: AsyncSentDm | None = None
async def initialize(self) -> None:
settings = get_settings()
self._client = AsyncSentDm(
api_key=settings.sent_dm_api_key,
base_url=settings.sent_dm_base_url,
)
await self._client.__aenter__()
async def cleanup(self) -> None:
if self._client:
await self._client.__aexit__(None, None, None)
self._client = None
@property
def client(self) -> AsyncSentDm:
if self._client is None:
raise RuntimeError("Sent client not initialized")
return self._client
sent_manager = SentClientManager()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
await sent_manager.initialize()
yield
await sent_manager.cleanup()Schemas
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator
from typing import Literal, Any
from datetime import datetime
class TemplateRequest(BaseModel):
id: str = Field(..., min_length=1)
name: str = Field(..., min_length=1)
parameters: dict[str, str] = Field(default_factory=dict)
class SendMessageRequest(BaseModel):
phone_numbers: list[str] = Field(..., min_length=1, max_length=100)
template: TemplateRequest
channels: list[Literal["sms", "whatsapp", "telegram"]] = Field(default=["whatsapp"])
test_mode: bool = Field(default=False)
@field_validator("phone_numbers")
@classmethod
def validate_phone_numbers(cls, v: list[str]) -> list[str]:
for phone in v:
if not phone.startswith("+"):
raise ValueError(f"Phone number '{phone}' must start with '+' (E.164 format)")
return v
class SendMessageResponse(BaseModel):
message_id: str
status: str
recipient: str
sent_at: datetime = Field(default_factory=datetime.utcnow)
class BatchSendResponse(BaseModel):
total_requested: int
successful: int
failed: int
messages: list[SendMessageResponse]
errors: list[dict[str, Any]] = Field(default_factory=list)
class WebhookEvent(BaseModel):
type: str
data: dict[str, Any]
timestamp: datetime | None = Field(default=None)
class HealthCheckResponse(BaseModel):
status: Literal["healthy", "unhealthy"]
version: str
timestamp: datetime
services: dict[str, Literal["up", "down"]]Dependencies
# app/dependencies.py
from fastapi import Request, Depends, Header, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sent_dm import AsyncSentDm
from app.config import Settings, get_settings
from app.lifespan import sent_manager
security = HTTPBearer(auto_error=False)
def get_sent_client() -> AsyncSentDm:
return sent_manager.client
async def verify_webhook_signature(
request: Request,
x_webhook_signature: str | None = Header(None),
settings: Settings = Depends(get_settings),
) -> bytes:
if not x_webhook_signature:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing X-Webhook-Signature header",
)
if not settings.sent_dm_webhook_secret:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Webhook secret not configured",
)
payload = await request.body()
client = sent_manager.client
is_valid = client.webhooks.verify_signature(
payload=payload.decode(),
signature=x_webhook_signature,
secret=settings.sent_dm_webhook_secret,
)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid webhook signature",
)
return payload
async def get_optional_token(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> str | None:
return credentials.credentials if credentials else NoneExceptions
# app/exceptions.py
from fastapi import Request, FastAPI
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from sent_dm import APIError
import logging
logger = logging.getLogger(__name__)
class SentServiceException(Exception):
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(self.message)
async def sent_api_exception_handler(request: Request, exc: APIError) -> JSONResponse:
logger.error(
"Sent API Error",
extra={"status": exc.status, "error": exc.name, "path": request.url.path},
)
status_code = {400: 400, 401: 401, 403: 403, 404: 404, 422: 422, 429: 429}.get(exc.status, 500)
return JSONResponse(
status_code=status_code,
content={"error": exc.name, "message": exc.message, "status_code": status_code},
)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
errors = [
{"field": ".".join(str(x) for x in error["loc"]), "message": error["msg"], "type": error["type"]}
for error in exc.errors()
]
return JSONResponse(
status_code=422,
content={"error": "Validation Error", "message": "Request validation failed", "details": errors},
)
async def http_exception_handler(request: Request, exc: StarletteHTTPException) -> JSONResponse:
return JSONResponse(
status_code=exc.status_code,
content={"error": "HTTP Error", "message": exc.detail, "status_code": exc.status_code},
)
def register_exception_handlers(app: FastAPI) -> None:
app.add_exception_handler(APIError, sent_api_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(StarletteHTTPException, http_exception_handler)Middleware
# app/middleware.py
import time
import logging
import uuid
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger(__name__)
class RequestIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
start_time = time.time()
request_id = getattr(request.state, "request_id", "unknown")
logger.info(
"Request started",
extra={"request_id": request_id, "method": request.method, "path": request.url.path},
)
try:
response = await call_next(request)
except Exception as exc:
logger.error("Request failed", extra={"request_id": request_id, "error": str(exc)})
raise
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
logger.info(
"Request completed",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(process_time * 1000, 2),
},
)
return responseService Layer
# app/services/sent_service.py
import logging
from typing import Any
from sent_dm import AsyncSentDm
from app.models.schemas import SendMessageRequest, SendMessageResponse, BatchSendResponse
logger = logging.getLogger(__name__)
class SentService:
def __init__(self, client: AsyncSentDm):
self._client = client
async def send_message(self, request: SendMessageRequest) -> SendMessageResponse:
response = await self._client.messages.send(
to=request.phone_numbers,
template={
"id": request.template.id,
"name": request.template.name,
"parameters": request.template.parameters,
},
channels=request.channels,
test_mode=request.test_mode,
)
message = response.data.messages[0]
logger.info(
"Message sent",
extra={"message_id": message.id, "recipient": request.phone_numbers[0], "status": message.status},
)
return SendMessageResponse(
message_id=message.id,
status=message.status,
recipient=request.phone_numbers[0],
)
async def send_batch(self, requests: list[SendMessageRequest]) -> BatchSendResponse:
messages = []
errors = []
successful = 0
for req in requests:
try:
result = await self.send_message(req)
messages.append(result)
successful += 1
except Exception as e:
errors.append({"recipient": req.phone_numbers[0] if req.phone_numbers else None, "error": str(e)})
return BatchSendResponse(
total_requested=len(requests),
successful=successful,
failed=len(errors),
messages=messages,
errors=errors,
)Routers
Messages Router
# app/routers/messages.py
from fastapi import APIRouter, Depends, status
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.dependencies import get_sent_client
from app.models.schemas import SendMessageRequest, SendMessageResponse, BatchSendResponse
from app.services.sent_service import SentService
from app.config import get_settings
limiter = Limiter(key_func=get_remote_address)
router = APIRouter(
prefix="/api/messages",
tags=["Messages"],
responses={401: {"description": "Unauthorized"}, 429: {"description": "Rate limit exceeded"}},
)
@router.post("/send", response_model=SendMessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit(lambda: get_settings().rate_limit)
async def send_message(
request: SendMessageRequest,
client=Depends(get_sent_client),
) -> SendMessageResponse:
service = SentService(client)
return await service.send_message(request)
@router.post("/send-batch", response_model=BatchSendResponse, status_code=status.HTTP_200_OK)
@limiter.limit(lambda: get_settings().rate_limit)
async def send_batch(
requests: list[SendMessageRequest],
client=Depends(get_sent_client),
) -> BatchSendResponse:
service = SentService(client)
return await service.send_batch(requests)Webhooks Router
# app/routers/webhooks.py
import json
import logging
from typing import Any
from fastapi import APIRouter, Depends, BackgroundTasks, status
from app.dependencies import verify_webhook_signature
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
async def process_webhook_event(event_data: dict[str, Any]) -> None:
event_type = event_data.get("type")
data = event_data.get("data", {})
logger.info(f"Processing webhook event: {event_type}")
match event_type:
case "message.status.updated":
logger.info(f"Message {data.get('id')} status updated to {data.get('status')}")
case "message.delivered":
logger.info(f"Message {data.get('id')} delivered successfully")
case "message.failed":
logger.error(f"Message {data.get('id')} failed: {data.get('error', {}).get('message')}")
case "message.read":
logger.info(f"Message {data.get('id')} was read by recipient")
case _:
logger.warning(f"Unhandled webhook event type: {event_type}")
@router.post("/sent", status_code=status.HTTP_200_OK)
async def handle_webhook(
background_tasks: BackgroundTasks,
payload: bytes = Depends(verify_webhook_signature),
) -> dict[str, bool]:
event_data = json.loads(payload)
background_tasks.add_task(process_webhook_event, event_data)
return {"received": True}Health Router
# app/routers/health.py
from datetime import datetime
from fastapi import APIRouter, Depends, status
from sent_dm import AsyncSentDm
from app.dependencies import get_sent_client
from app.config import get_settings
from app.models.schemas import HealthCheckResponse
router = APIRouter(prefix="/health", tags=["Health"])
@router.get("", response_model=HealthCheckResponse)
async def health_check(client: AsyncSentDm = Depends(get_sent_client)) -> HealthCheckResponse:
settings = get_settings()
services: dict[str, str] = {"api": "up", "sent_dm": "up"}
return HealthCheckResponse(
status="healthy", # type: ignore
version=settings.app_version,
timestamp=datetime.utcnow(),
services=services, # type: ignore
)
@router.get("/ready", status_code=status.HTTP_200_OK)
async def readiness() -> dict[str, str]:
return {"status": "ready"}
@router.get("/live", status_code=status.HTTP_200_OK)
async def liveness() -> dict[str, str]:
return {"status": "alive"}Main Application
# app/main.py
import logging
from fastapi import FastAPI
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from app.config import get_settings
from app.lifespan import lifespan
from app.middleware import RequestIdMiddleware, LoggingMiddleware
from app.exceptions import register_exception_handlers
from app.routers import messages, webhooks, health
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def create_application() -> FastAPI:
settings = get_settings()
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="FastAPI integration with Sent DM for messaging",
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
)
app.add_middleware(RequestIdMiddleware)
app.add_middleware(LoggingMiddleware)
limiter = Limiter(key_func=lambda: "global")
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
register_exception_handlers(app)
app.include_router(health.router)
app.include_router(messages.router)
app.include_router(webhooks.router)
return app
app = create_application()
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=get_settings().debug)# app/routers/__init__.py
from app.routers import messages, webhooks, health
__all__ = ["messages", "webhooks", "health"]Testing
# tests/conftest.py
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
from app.main import create_application
from app.config import Settings, get_settings
def get_test_settings() -> Settings:
return Settings(
sent_dm_api_key="your_api_key_here",
sent_dm_webhook_secret="your_webhook_secret_here",
debug=True,
log_level="DEBUG",
)
@pytest.fixture
def app():
app = create_application()
app.dependency_overrides[get_settings] = get_test_settings
return app
@pytest.fixture
def client(app) -> TestClient:
return TestClient(app)
@pytest_asyncio.fixture
async def async_client(app) -> AsyncClient:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
yield client# tests/test_messages.py
import pytest
from unittest.mock import AsyncMock, patch
from fastapi import status
from sent_dm import APIError
@pytest.mark.asyncio
async def test_send_message_success(async_client):
mock_response = {"data": {"messages": [{"id": "msg_123", "status": "pending"}]}}
with patch("sent_dm.AsyncSentDm.messages.send", new_callable=AsyncMock) as mock_send:
mock_send.return_value = mock_response
response = await async_client.post(
"/api/messages/send",
json={
"phone_numbers": ["+1234567890"],
"template": {"id": "template-123", "name": "welcome", "parameters": {"name": "John"}},
"channels": ["whatsapp"],
},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["message_id"] == "msg_123"
@pytest.mark.asyncio
async def test_send_message_validation_error(async_client):
response = await async_client.post("/api/messages/send", json={"phone_numbers": ["invalid"]})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_send_message_api_error(async_client):
with patch("sent_dm.AsyncSentDm.messages.send", new_callable=AsyncMock) as mock_send:
mock_send.side_effect = APIError(status=400, name="BadRequestError", message="Invalid phone number", headers={})
response = await async_client.post(
"/api/messages/send",
json={"phone_numbers": ["+1234567890"], "template": {"id": "template-123", "name": "welcome"}},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.asyncio
async def test_health_check(async_client):
response = await async_client.get("/health")
assert response.status_code == status.HTTP_200_OK
assert "services" in response.json()# tests/test_webhooks.py
import json
import hmac
import hashlib
import pytest
from unittest.mock import patch
from fastapi import status
def generate_signature(payload: str, secret: str) -> str:
return hmac.new(secret.encode(), payload.encode(), hashlib.sha256).hexdigest()
@pytest.mark.asyncio
async def test_webhook_success(async_client):
payload = {"type": "message.delivered", "data": {"id": "msg_123", "status": "delivered"}}
with patch("sent_dm.AsyncSentDm.webhooks.verify_signature") as mock_verify:
mock_verify.return_value = True
response = await async_client.post(
"/webhooks/sent",
content=json.dumps(payload),
headers={"X-Webhook-Signature": "valid_signature"},
)
assert response.status_code == status.HTTP_200_OK
@pytest.mark.asyncio
async def test_webhook_missing_signature(async_client):
response = await async_client.post("/webhooks/sent", json={"type": "test"})
assert response.status_code == status.HTTP_401_UNAUTHORIZEDRunning the Application
# Development
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Production (using Gunicorn with Uvicorn workers)
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
# With Docker
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app/ ./app/
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]Environment Variables
| Variable | Required | Description |
|---|---|---|
SENT_DM_API_KEY | Yes | Your Sent DM API key |
SENT_DM_WEBHOOK_SECRET | No | Secret for webhook signature verification |
SENT_DM_BASE_URL | No | Custom API base URL (for testing) |
DEBUG | No | Enable debug mode (default: false) |
LOG_LEVEL | No | Logging level (default: INFO) |
RATE_LIMIT | No | Rate limit string (default: 100/minute) |
Next Steps
- Handle errors in your application
- Configure webhooks to receive delivery status
- Learn about best practices for production deployments
- Explore Pydantic v2 documentation for advanced validation
- Check FastAPI documentation for more patterns