Python SDK

FastAPI Integration

Complete FastAPI integration with Pydantic v2 models, dependency injection, lifespan context management, and production-ready patterns.

This guide uses modern FastAPI patterns including lifespan context managers, Pydantic v2 Settings, and modular APIRouter architecture.

Project Structure

my_fastapi_app/
├── app/
│   ├── __init__.py
│   ├── main.py                 # FastAPI app factory
│   ├── config.py               # Pydantic Settings
│   ├── dependencies.py         # Shared dependencies
│   ├── lifespan.py             # Lifespan context manager
│   ├── middleware.py           # Custom middleware
│   ├── exceptions.py           # Custom exceptions & handlers
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── messages.py         # Message endpoints
│   │   ├── webhooks.py         # Webhook handlers
│   │   └── health.py           # Health checks
│   ├── services/
│   │   ├── __init__.py
│   │   └── sent_service.py     # Sent client service
│   ├── models/
│   │   ├── __init__.py
│   │   └── schemas.py          # Pydantic models
│   └── utils/
│       ├── __init__.py
│       └── logging.py          # Logging configuration
├── tests/
│   ├── __init__.py
│   ├── conftest.py             # Pytest fixtures
│   ├── test_messages.py
│   └── test_webhooks.py
├── .env
├── pyproject.toml
└── requirements.txt

Installation

pip install sentdm fastapi uvicorn pydantic-settings slowapi

# For improved async HTTP performance
pip install sentdm[aiohttp]

# Development dependencies
pip install pytest pytest-asyncio httpx

Configuration

# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, field_validator
from functools import lru_cache


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    app_name: str = Field(default="Sent DM API", alias="APP_NAME")
    app_version: str = Field(default="1.0.0", alias="APP_VERSION")
    debug: bool = Field(default=False, alias="DEBUG")
    sent_dm_api_key: str = Field(alias="SENT_DM_API_KEY")
    sent_dm_webhook_secret: str | None = Field(default=None, alias="SENT_DM_WEBHOOK_SECRET")
    sent_dm_base_url: str | None = Field(default=None, alias="SENT_DM_BASE_URL")
    rate_limit: str = Field(default="100/minute", alias="RATE_LIMIT")
    log_level: str = Field(default="INFO", alias="LOG_LEVEL")

    @field_validator("sent_dm_api_key")
    @classmethod
    def validate_api_key(cls, v: str) -> str:
        if not v or len(v) < 10:
            raise ValueError("SENT_DM_API_KEY must be a valid API key")
        return v


@lru_cache
def get_settings() -> Settings:
    return Settings()
# .env
SENT_DM_API_KEY=your_api_key_here
SENT_DM_WEBHOOK_SECRET=your_webhook_secret
DEBUG=false
LOG_LEVEL=INFO
RATE_LIMIT=100/minute

Async Client Manager with Lifespan

# app/lifespan.py
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI
from sent_dm import AsyncSentDm
from app.config import get_settings


class SentClientManager:
    def __init__(self) -> None:
        self._client: AsyncSentDm | None = None

    async def initialize(self) -> None:
        settings = get_settings()
        self._client = AsyncSentDm(
            api_key=settings.sent_dm_api_key,
            base_url=settings.sent_dm_base_url,
        )
        await self._client.__aenter__()

    async def cleanup(self) -> None:
        if self._client:
            await self._client.__aexit__(None, None, None)
            self._client = None

    @property
    def client(self) -> AsyncSentDm:
        if self._client is None:
            raise RuntimeError("Sent client not initialized")
        return self._client


sent_manager = SentClientManager()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    await sent_manager.initialize()
    yield
    await sent_manager.cleanup()

Schemas

# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator
from typing import Literal, Any
from datetime import datetime


class TemplateRequest(BaseModel):
    id: str = Field(..., min_length=1)
    name: str = Field(..., min_length=1)
    parameters: dict[str, str] = Field(default_factory=dict)


class SendMessageRequest(BaseModel):
    phone_numbers: list[str] = Field(..., min_length=1, max_length=100)
    template: TemplateRequest
    channels: list[Literal["sms", "whatsapp", "telegram"]] = Field(default=["whatsapp"])
    test_mode: bool = Field(default=False)

    @field_validator("phone_numbers")
    @classmethod
    def validate_phone_numbers(cls, v: list[str]) -> list[str]:
        for phone in v:
            if not phone.startswith("+"):
                raise ValueError(f"Phone number '{phone}' must start with '+' (E.164 format)")
        return v


class SendMessageResponse(BaseModel):
    message_id: str
    status: str
    recipient: str
    sent_at: datetime = Field(default_factory=datetime.utcnow)


class BatchSendResponse(BaseModel):
    total_requested: int
    successful: int
    failed: int
    messages: list[SendMessageResponse]
    errors: list[dict[str, Any]] = Field(default_factory=list)


class WebhookEvent(BaseModel):
    type: str
    data: dict[str, Any]
    timestamp: datetime | None = Field(default=None)


class HealthCheckResponse(BaseModel):
    status: Literal["healthy", "unhealthy"]
    version: str
    timestamp: datetime
    services: dict[str, Literal["up", "down"]]

Dependencies

# app/dependencies.py
from fastapi import Request, Depends, Header, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sent_dm import AsyncSentDm
from app.config import Settings, get_settings
from app.lifespan import sent_manager


security = HTTPBearer(auto_error=False)


def get_sent_client() -> AsyncSentDm:
    return sent_manager.client


async def verify_webhook_signature(
    request: Request,
    x_webhook_signature: str | None = Header(None),
    settings: Settings = Depends(get_settings),
) -> bytes:
    if not x_webhook_signature:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing X-Webhook-Signature header",
        )

    if not settings.sent_dm_webhook_secret:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Webhook secret not configured",
        )

    payload = await request.body()
    client = sent_manager.client
    is_valid = client.webhooks.verify_signature(
        payload=payload.decode(),
        signature=x_webhook_signature,
        secret=settings.sent_dm_webhook_secret,
    )

    if not is_valid:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid webhook signature",
        )

    return payload


async def get_optional_token(
    credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> str | None:
    return credentials.credentials if credentials else None

Exceptions

# app/exceptions.py
from fastapi import Request, FastAPI
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from sent_dm import APIError
import logging

logger = logging.getLogger(__name__)


class SentServiceException(Exception):
    def __init__(self, message: str, status_code: int = 500):
        self.message = message
        self.status_code = status_code
        super().__init__(self.message)


async def sent_api_exception_handler(request: Request, exc: APIError) -> JSONResponse:
    logger.error(
        "Sent API Error",
        extra={"status": exc.status, "error": exc.name, "path": request.url.path},
    )

    status_code = {400: 400, 401: 401, 403: 403, 404: 404, 422: 422, 429: 429}.get(exc.status, 500)

    return JSONResponse(
        status_code=status_code,
        content={"error": exc.name, "message": exc.message, "status_code": status_code},
    )


async def validation_exception_handler(
    request: Request, exc: RequestValidationError
) -> JSONResponse:
    errors = [
        {"field": ".".join(str(x) for x in error["loc"]), "message": error["msg"], "type": error["type"]}
        for error in exc.errors()
    ]

    return JSONResponse(
        status_code=422,
        content={"error": "Validation Error", "message": "Request validation failed", "details": errors},
    )


async def http_exception_handler(request: Request, exc: StarletteHTTPException) -> JSONResponse:
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": "HTTP Error", "message": exc.detail, "status_code": exc.status_code},
    )


def register_exception_handlers(app: FastAPI) -> None:
    app.add_exception_handler(APIError, sent_api_exception_handler)
    app.add_exception_handler(RequestValidationError, validation_exception_handler)
    app.add_exception_handler(StarletteHTTPException, http_exception_handler)

Middleware

# app/middleware.py
import time
import logging
import uuid
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)


class RequestIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response


class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        start_time = time.time()
        request_id = getattr(request.state, "request_id", "unknown")

        logger.info(
            "Request started",
            extra={"request_id": request_id, "method": request.method, "path": request.url.path},
        )

        try:
            response = await call_next(request)
        except Exception as exc:
            logger.error("Request failed", extra={"request_id": request_id, "error": str(exc)})
            raise

        process_time = time.time() - start_time
        response.headers["X-Process-Time"] = str(process_time)

        logger.info(
            "Request completed",
            extra={
                "request_id": request_id,
                "method": request.method,
                "path": request.url.path,
                "status_code": response.status_code,
                "duration_ms": round(process_time * 1000, 2),
            },
        )

        return response

Service Layer

# app/services/sent_service.py
import logging
from typing import Any
from sent_dm import AsyncSentDm
from app.models.schemas import SendMessageRequest, SendMessageResponse, BatchSendResponse

logger = logging.getLogger(__name__)


class SentService:
    def __init__(self, client: AsyncSentDm):
        self._client = client

    async def send_message(self, request: SendMessageRequest) -> SendMessageResponse:
        response = await self._client.messages.send(
            to=request.phone_numbers,
            template={
                "id": request.template.id,
                "name": request.template.name,
                "parameters": request.template.parameters,
            },
            channels=request.channels,
            test_mode=request.test_mode,
        )

        message = response.data.messages[0]

        logger.info(
            "Message sent",
            extra={"message_id": message.id, "recipient": request.phone_numbers[0], "status": message.status},
        )

        return SendMessageResponse(
            message_id=message.id,
            status=message.status,
            recipient=request.phone_numbers[0],
        )

    async def send_batch(self, requests: list[SendMessageRequest]) -> BatchSendResponse:
        messages = []
        errors = []
        successful = 0

        for req in requests:
            try:
                result = await self.send_message(req)
                messages.append(result)
                successful += 1
            except Exception as e:
                errors.append({"recipient": req.phone_numbers[0] if req.phone_numbers else None, "error": str(e)})

        return BatchSendResponse(
            total_requested=len(requests),
            successful=successful,
            failed=len(errors),
            messages=messages,
            errors=errors,
        )

Routers

Messages Router

# app/routers/messages.py
from fastapi import APIRouter, Depends, status
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.dependencies import get_sent_client
from app.models.schemas import SendMessageRequest, SendMessageResponse, BatchSendResponse
from app.services.sent_service import SentService
from app.config import get_settings

limiter = Limiter(key_func=get_remote_address)

router = APIRouter(
    prefix="/api/messages",
    tags=["Messages"],
    responses={401: {"description": "Unauthorized"}, 429: {"description": "Rate limit exceeded"}},
)


@router.post("/send", response_model=SendMessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit(lambda: get_settings().rate_limit)
async def send_message(
    request: SendMessageRequest,
    client=Depends(get_sent_client),
) -> SendMessageResponse:
    service = SentService(client)
    return await service.send_message(request)


@router.post("/send-batch", response_model=BatchSendResponse, status_code=status.HTTP_200_OK)
@limiter.limit(lambda: get_settings().rate_limit)
async def send_batch(
    requests: list[SendMessageRequest],
    client=Depends(get_sent_client),
) -> BatchSendResponse:
    service = SentService(client)
    return await service.send_batch(requests)

Webhooks Router

# app/routers/webhooks.py
import json
import logging
from typing import Any
from fastapi import APIRouter, Depends, BackgroundTasks, status
from app.dependencies import verify_webhook_signature

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/webhooks", tags=["Webhooks"])


async def process_webhook_event(event_data: dict[str, Any]) -> None:
    event_type = event_data.get("type")
    data = event_data.get("data", {})

    logger.info(f"Processing webhook event: {event_type}")

    match event_type:
        case "message.status.updated":
            logger.info(f"Message {data.get('id')} status updated to {data.get('status')}")
        case "message.delivered":
            logger.info(f"Message {data.get('id')} delivered successfully")
        case "message.failed":
            logger.error(f"Message {data.get('id')} failed: {data.get('error', {}).get('message')}")
        case "message.read":
            logger.info(f"Message {data.get('id')} was read by recipient")
        case _:
            logger.warning(f"Unhandled webhook event type: {event_type}")


@router.post("/sent", status_code=status.HTTP_200_OK)
async def handle_webhook(
    background_tasks: BackgroundTasks,
    payload: bytes = Depends(verify_webhook_signature),
) -> dict[str, bool]:
    event_data = json.loads(payload)
    background_tasks.add_task(process_webhook_event, event_data)
    return {"received": True}

Health Router

# app/routers/health.py
from datetime import datetime
from fastapi import APIRouter, Depends, status
from sent_dm import AsyncSentDm
from app.dependencies import get_sent_client
from app.config import get_settings
from app.models.schemas import HealthCheckResponse

router = APIRouter(prefix="/health", tags=["Health"])


@router.get("", response_model=HealthCheckResponse)
async def health_check(client: AsyncSentDm = Depends(get_sent_client)) -> HealthCheckResponse:
    settings = get_settings()
    services: dict[str, str] = {"api": "up", "sent_dm": "up"}

    return HealthCheckResponse(
        status="healthy",  # type: ignore
        version=settings.app_version,
        timestamp=datetime.utcnow(),
        services=services,  # type: ignore
    )


@router.get("/ready", status_code=status.HTTP_200_OK)
async def readiness() -> dict[str, str]:
    return {"status": "ready"}


@router.get("/live", status_code=status.HTTP_200_OK)
async def liveness() -> dict[str, str]:
    return {"status": "alive"}

Main Application

# app/main.py
import logging
from fastapi import FastAPI
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from app.config import get_settings
from app.lifespan import lifespan
from app.middleware import RequestIdMiddleware, LoggingMiddleware
from app.exceptions import register_exception_handlers
from app.routers import messages, webhooks, health

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)


def create_application() -> FastAPI:
    settings = get_settings()

    app = FastAPI(
        title=settings.app_name,
        version=settings.app_version,
        description="FastAPI integration with Sent DM for messaging",
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
        redoc_url="/redoc" if settings.debug else None,
    )

    app.add_middleware(RequestIdMiddleware)
    app.add_middleware(LoggingMiddleware)

    limiter = Limiter(key_func=lambda: "global")
    app.state.limiter = limiter
    app.add_middleware(SlowAPIMiddleware)

    register_exception_handlers(app)

    app.include_router(health.router)
    app.include_router(messages.router)
    app.include_router(webhooks.router)

    return app


app = create_application()


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=get_settings().debug)
# app/routers/__init__.py
from app.routers import messages, webhooks, health

__all__ = ["messages", "webhooks", "health"]

Testing

# tests/conftest.py
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
from app.main import create_application
from app.config import Settings, get_settings


def get_test_settings() -> Settings:
    return Settings(
        sent_dm_api_key="your_api_key_here",
        sent_dm_webhook_secret="your_webhook_secret_here",
        debug=True,
        log_level="DEBUG",
    )


@pytest.fixture
def app():
    app = create_application()
    app.dependency_overrides[get_settings] = get_test_settings
    return app


@pytest.fixture
def client(app) -> TestClient:
    return TestClient(app)


@pytest_asyncio.fixture
async def async_client(app) -> AsyncClient:
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        yield client
# tests/test_messages.py
import pytest
from unittest.mock import AsyncMock, patch
from fastapi import status
from sent_dm import APIError


@pytest.mark.asyncio
async def test_send_message_success(async_client):
    mock_response = {"data": {"messages": [{"id": "msg_123", "status": "pending"}]}}

    with patch("sent_dm.AsyncSentDm.messages.send", new_callable=AsyncMock) as mock_send:
        mock_send.return_value = mock_response
        response = await async_client.post(
            "/api/messages/send",
            json={
                "phone_numbers": ["+1234567890"],
                "template": {"id": "template-123", "name": "welcome", "parameters": {"name": "John"}},
                "channels": ["whatsapp"],
            },
        )

    assert response.status_code == status.HTTP_200_OK
    data = response.json()
    assert data["message_id"] == "msg_123"


@pytest.mark.asyncio
async def test_send_message_validation_error(async_client):
    response = await async_client.post("/api/messages/send", json={"phone_numbers": ["invalid"]})
    assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY


@pytest.mark.asyncio
async def test_send_message_api_error(async_client):
    with patch("sent_dm.AsyncSentDm.messages.send", new_callable=AsyncMock) as mock_send:
        mock_send.side_effect = APIError(status=400, name="BadRequestError", message="Invalid phone number", headers={})
        response = await async_client.post(
            "/api/messages/send",
            json={"phone_numbers": ["+1234567890"], "template": {"id": "template-123", "name": "welcome"}},
        )

    assert response.status_code == status.HTTP_400_BAD_REQUEST


@pytest.mark.asyncio
async def test_health_check(async_client):
    response = await async_client.get("/health")
    assert response.status_code == status.HTTP_200_OK
    assert "services" in response.json()
# tests/test_webhooks.py
import json
import hmac
import hashlib
import pytest
from unittest.mock import patch
from fastapi import status


def generate_signature(payload: str, secret: str) -> str:
    return hmac.new(secret.encode(), payload.encode(), hashlib.sha256).hexdigest()


@pytest.mark.asyncio
async def test_webhook_success(async_client):
    payload = {"type": "message.delivered", "data": {"id": "msg_123", "status": "delivered"}}

    with patch("sent_dm.AsyncSentDm.webhooks.verify_signature") as mock_verify:
        mock_verify.return_value = True
        response = await async_client.post(
            "/webhooks/sent",
            content=json.dumps(payload),
            headers={"X-Webhook-Signature": "valid_signature"},
        )

    assert response.status_code == status.HTTP_200_OK


@pytest.mark.asyncio
async def test_webhook_missing_signature(async_client):
    response = await async_client.post("/webhooks/sent", json={"type": "test"})
    assert response.status_code == status.HTTP_401_UNAUTHORIZED

Running the Application

# Development
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# Production (using Gunicorn with Uvicorn workers)
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

# With Docker
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app/ ./app/
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Environment Variables

VariableRequiredDescription
SENT_DM_API_KEYYesYour Sent DM API key
SENT_DM_WEBHOOK_SECRETNoSecret for webhook signature verification
SENT_DM_BASE_URLNoCustom API base URL (for testing)
DEBUGNoEnable debug mode (default: false)
LOG_LEVELNoLogging level (default: INFO)
RATE_LIMITNoRate limit string (default: 100/minute)

Next Steps

On this page