Python SDK

Celery Integration

Complete Celery integration with task base classes, retry strategies, workflow orchestration, monitoring, and database integration for production workloads.

This guide covers Celery 5.3+ best practices with Redis/RabbitMQ broker, result backend, task signals, and Flower monitoring.

Project Structure

project/
├── celery_app/
│   ├── __init__.py
│   ├── app.py                 # Celery app configuration
│   ├── config.py              # Configuration classes
│   ├── base_task.py           # Base task with retry logic
│   ├── rate_limiter.py        # Token bucket rate limiting
│   └── exceptions.py          # Custom exceptions
├── tasks/
│   ├── __init__.py
│   ├── messages.py            # Message sending tasks
│   ├── bulk.py                # Bulk operation tasks
│   └── workflows.py           # Canvas workflow definitions
├── models/
│   └── message_log.py         # Database models
├── tests/
│   ├── conftest.py            # pytest fixtures
│   └── test_tasks.py          # Task tests
├── docker-compose.yml         # Stack configuration
└── requirements.txt

Installation

pip install celery[redis] sentdm sqlalchemy psycopg2-binary flower prometheus-client

Configuration

Celery Configuration

# celery_app/config.py
import os
from typing import Dict, Any

class CeleryConfig:
    """Base Celery configuration"""
    broker_url: str = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
    broker_connection_retry_on_startup: bool = True
    result_backend: str = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
    result_expires: int = 3600 * 24
    task_serializer: str = "json"
    accept_content: list = ["json"]
    task_track_started: bool = True
    task_time_limit: int = 300
    task_soft_time_limit: int = 240
    task_acks_late: bool = True
    worker_prefetch_multiplier: int = 1
    worker_max_tasks_per_child: int = 1000

    task_routes: Dict[str, str] = {
        "tasks.messages.*": {"queue": "messages"},
        "tasks.bulk.*": {"queue": "bulk"},
        "tasks.workflows.*": {"queue": "workflows"},
    }

    beat_schedule: Dict[str, Any] = {
        "cleanup-old-logs": {
            "task": "tasks.scheduled.cleanup_message_logs",
            "schedule": 3600 * 24,
            "args": (30,),
        },
        "retry-failed-messages": {
            "task": "tasks.scheduled.retry_failed_messages",
            "schedule": 300,
        },
    }

    database_url: str = os.getenv("DATABASE_URL", "postgresql://localhost/celery_tasks")
    sent_dm_api_key: str = os.getenv("SENT_DM_API_KEY", "")

class ProductionConfig(CeleryConfig):
    """Production configuration"""
    broker_transport_options: Dict[str, Any] = {
        "visibility_timeout": 43200,
        "queue_order_strategy": "priority",
    }
    worker_enable_remote_control: bool = True
    worker_send_task_events: bool = True
    task_send_sent_event: bool = True

def get_config():
    env = os.getenv("CELERY_ENV", "development")
    return ProductionConfig() if env == "production" else CeleryConfig()

Celery App Factory

# celery_app/app.py
from celery import Celery
from .config import get_config

def create_celery_app(app_name: str = "sent_dm_tasks") -> Celery:
    """Create and configure Celery application"""
    config = get_config()
    app = Celery(app_name)
    app.config_from_object(config)
    app.autodiscover_tasks([
        "tasks.messages",
        "tasks.bulk",
        "tasks.workflows",
    ])
    return app

app = create_celery_app()

Custom Exceptions

# celery_app/exceptions.py
from typing import Optional, Dict, Any

class SentDMTaskError(Exception):
    """Base exception for SentDM Celery tasks"""
    def __init__(self, message: str, error_code: Optional[str] = None,
                 details: Optional[Dict] = None, retryable: bool = False):
        super().__init__(message)
        self.message = message
        self.error_code = error_code or "TASK_ERROR"
        self.details = details or {}
        self.retryable = retryable

class NonRetryableError(SentDMTaskError):
    """Error that should not be retried (4xx errors)"""
    def __init__(self, message: str, error_code: str = "NON_RETRYABLE", details=None):
        super().__init__(message, error_code, details, retryable=False)

class RateLimitExceeded(SentDMTaskError):
    """Rate limit exceeded - retry with backoff"""
    def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60):
        super().__init__(message, "RATE_LIMIT_EXCEEDED", {"retry_after": retry_after}, True)
        self.retry_after = retry_after

class TemporaryError(SentDMTaskError):
    """Temporary error - should be retried"""
    def __init__(self, message: str, error_code: str = "TEMPORARY_ERROR", details=None):
        super().__init__(message, error_code, details, retryable=True)

Token Bucket Rate Limiter

# celery_app/rate_limiter.py
import time
import threading
from typing import Optional, Dict
from dataclasses import dataclass

@dataclass
class TokenBucket:
    """Token bucket for rate limiting"""
    capacity: int
    tokens: float
    fill_rate: float
    last_update: float

    def consume(self, tokens: int = 1) -> bool:
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(self.capacity, self.tokens + elapsed * self.fill_rate)
        self.last_update = now
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

class RateLimiter:
    """Thread-safe rate limiter with multiple buckets"""
    def __init__(self):
        self._buckets: Dict[str, TokenBucket] = {}
        self._lock = threading.Lock()

    def acquire(self, key: str, capacity: int, fill_rate: float,
                tokens: int = 1, timeout: Optional[float] = None) -> bool:
        bucket = self._buckets.get(key)
        if not bucket:
            bucket = TokenBucket(capacity, capacity, fill_rate, time.time())
            self._buckets[key] = bucket

        start_time = time.time()
        while True:
            with self._lock:
                if bucket.consume(tokens):
                    return True
            wait_time = bucket.get_wait_time(tokens)
            if timeout and (time.time() - start_time + wait_time > timeout):
                return False
            if wait_time > 0:
                time.sleep(min(wait_time, 0.1))

rate_limiter = RateLimiter()

Base Task Class

# celery_app/base_task.py
import logging
from celery import Task
from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded
from .exceptions import NonRetryableError, RateLimitExceeded, TemporaryError
from .rate_limiter import rate_limiter

logger = logging.getLogger(__name__)

class SentDMBaseTask(Task):
    """Base task class with retry logic, rate limiting, and error handling"""
    abstract = True
    max_retries = 3
    default_retry_delay = 60
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True
    rate_limit_key: Optional[str] = None
    rate_limit_capacity: int = 100
    rate_limit_fill_rate: float = 10.0
    soft_time_limit = 240
    time_limit = 300

    def __init__(self):
        self._sent_client = None

    @property
    def sent_client(self):
        """Lazy initialization of SentDM client"""
        if self._sent_client is None:
            from sent_dm import SentDm
            import os
            api_key = os.getenv("SENT_DM_API_KEY")
            if not api_key:
                raise NonRetryableError("SENT_DM_API_KEY not configured")
            self._sent_client = SentDm(api_key)
        return self._sent_client

    def apply_rate_limit(self, tokens: int = 1) -> None:
        if not self.rate_limit_key:
            return
        acquired = rate_limiter.acquire(
            key=self.rate_limit_key,
            capacity=self.rate_limit_capacity,
            fill_rate=self.rate_limit_fill_rate,
            tokens=tokens,
            timeout=30,
        )
        if not acquired:
            raise RateLimitExceeded(retry_after=int(1 / self.rate_limit_fill_rate))

    def call(self, *args, **kwargs):
        self.apply_rate_limit()
        return super().call(*args, **kwargs)

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"Task {task_id} retry {self.request.retries}/{self.max_retries}: {exc}")
        super().on_retry(exc, task_id, args, kwargs, einfo)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"Task {task_id} failed: {exc}", extra={
            "task_id": task_id, "exception": str(exc),
        })
        super().on_failure(exc, task_id, args, kwargs, einfo)

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f"Task {task_id} completed successfully")
        super().on_success(retval, task_id, args, kwargs)

Message Tasks

# tasks/messages.py
import logging
from typing import Optional, Dict, Any
from datetime import datetime
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery_app.base_task import SentDMBaseTask
from celery_app.exceptions import NonRetryableError, RateLimitExceeded, TemporaryError, ValidationError

logger = logging.getLogger(__name__)

@shared_task(
    base=SentDMBaseTask, bind=True, name="tasks.messages.send_single_message",
    queue="messages", max_retries=3, default_retry_delay=60,
)
def send_single_message(
    self, phone_number: str, template_id: str,
    variables: Optional[Dict[str, Any]] = None,
    channels: Optional[list] = None, track_id: Optional[str] = None,
) -> Dict[str, Any]:
    """Send a single message with full retry logic and error handling"""
    # Validate inputs
    if not phone_number or not phone_number.startswith("+"):
        raise ValidationError("Invalid phone number format. Must be E.164")

    try:
        result = self.sent_client.messages.send(
            phone_number=phone_number, template_id=template_id,
            variables=variables or {}, channels=channels,
        )
        return {
            "success": True,
            "message_id": result.data.id if hasattr(result, "data") else None,
            "status": "sent", "track_id": track_id, "phone_number": phone_number,
        }
    except SoftTimeLimitExceeded:
        raise TemporaryError("Task timed out", error_code="TIMEOUT")
    except Exception as exc:
        error_msg = str(exc)
        if "rate limit" in error_msg.lower() or "429" in error_msg:
            retry_after = 60 * (2 ** self.request.retries)
            raise RateLimitExceeded(retry_after=retry_after)
        elif "invalid" in error_msg.lower() or "400" in error_msg:
            raise NonRetryableError(f"Invalid request: {error_msg}", "INVALID_REQUEST")
        elif self.request.retries < self.max_retries:
            raise self.retry(exc=exc)
        else:
            raise

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.messages.send_templated_batch", queue="messages")
def send_templated_batch(
    self, recipients: list, template_id: str, batch_variables: Optional[Dict] = None,
) -> Dict[str, Any]:
    """Send messages to multiple recipients"""
    results = {"total": len(recipients), "queued": 0, "failed": 0, "errors": []}
    for recipient in recipients:
        try:
            send_single_message.delay(
                phone_number=recipient.get("phone_number"),
                template_id=template_id,
                variables={**(batch_variables or {}), **recipient.get("variables", {})},
            )
            results["queued"] += 1
        except Exception as e:
            results["failed"] += 1
            results["errors"].append({"recipient": recipient.get("phone_number"), "error": str(e)})
    return results

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.messages.retry_failed_message", queue="messages")
def retry_failed_message(self, message_log_id: str) -> Dict[str, Any]:
    """Retry a previously failed message by log ID"""
    # Implementation for retrying failed messages
    pass  # Add your implementation

Bulk Operation Tasks

# tasks/bulk.py
import logging
from typing import List, Dict, Any, Optional
from celery import shared_task, group, chord
from celery_app.base_task import SentDMBaseTask
from tasks.messages import send_single_message

logger = logging.getLogger(__name__)

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.bulk.send_bulk_messages", queue="bulk")
def send_bulk_messages(
    self, recipients: List[Dict], template_id: str,
    variables: Optional[Dict] = None,
) -> Dict[str, Any]:
    """Send messages to multiple users using groups for parallel processing"""
    if not recipients:
        return {"success": True, "queued": 0}

    signatures = [
        send_single_message.s(
            phone_number=r["phone_number"],
            template_id=template_id,
            variables={**(variables or {}), **r.get("variables", {})},
        )
        for r in recipients
    ]

    job = group(signatures)
    result = job.apply_async()

    return {
        "success": True, "group_id": result.id,
        "total_tasks": len(signatures),
    }

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.bulk.validate_contacts", queue="bulk")
def validate_contacts(self, contacts: List[Dict]) -> Dict[str, List]:
    """Validate and filter contacts"""
    import re
    valid, invalid = [], []
    phone_pattern = re.compile(r"^\+[1-9]\d{1,14}$")

    for contact in contacts:
        phone = contact.get("phone_number", "").strip()
        if phone_pattern.match(phone):
            valid.append({**contact, "phone_number": phone})
        else:
            invalid.append({"contact": contact, "reason": "Invalid phone format"})

    return {"valid": valid, "invalid": invalid}

# ... additional bulk task implementations

Canvas Workflows

# tasks/workflows.py
"""
Celery Canvas workflow examples:
- chain: Sequential execution (A → B → C)
- group: Parallel execution ([A, B, C])
- chord: Group with callback ([A, B, C] → D)
"""

import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from celery import shared_task, chain, group, chord
from celery_app.base_task import SentDMBaseTask
from tasks.messages import send_single_message

logger = logging.getLogger(__name__)

# ============================================================================
# Chain Workflow (Sequential)
# ============================================================================

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.welcome_sequence", queue="workflows")
def welcome_sequence(self, phone_number: str, user_name: str) -> Dict[str, Any]:
    """Multi-step welcome sequence using chain"""
    workflow = chain(
        send_single_message.s(
            phone_number=phone_number, template_id="welcome", variables={"name": user_name}
        ),
        send_single_message.si(
            phone_number=phone_number, template_id="onboarding-tips", variables={"name": user_name}
        ).set(countdown=3600),  # 1 hour delay
        send_single_message.si(
            phone_number=phone_number, template_id="feature-highlight", variables={"name": user_name}
        ).set(countdown=86400),  # 24 hour delay
    )
    result = workflow.apply_async()
    return {
        "success": True, "workflow_id": result.id,
        "sequence": ["welcome", "onboarding-tips", "feature-highlight"],
    }

# ============================================================================
# Group Workflow (Parallel)
# ============================================================================

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.multi_channel_notify", queue="workflows")
def multi_channel_notify(self, phone_number: str, message_content: Dict) -> Dict[str, Any]:
    """Send same message via multiple channels in parallel"""
    channels = ["whatsapp", "sms"]
    tasks = [
        send_single_message.s(
            phone_number=phone_number, template_id=message_content["template_id"],
            variables=message_content.get("variables", {}), channels=[channel],
        )
        for channel in channels
    ]
    result = group(tasks).apply_async()
    return {"success": True, "group_id": result.id, "channels": channels}

# ============================================================================
# Chord Workflow (Parallel + Callback)
# ============================================================================

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.campaign_with_report", queue="workflows")
def campaign_with_report(
    self, recipients: List[Dict], template_id: str, campaign_id: str,
) -> Dict[str, Any]:
    """Send campaign messages and generate report when complete"""
    message_tasks = [
        send_single_message.s(
            phone_number=r["phone_number"], template_id=template_id, variables=r.get("variables", {})
        )
        for r in recipients
    ]
    callback = generate_campaign_report.s(campaign_id=campaign_id, total_recipients=len(recipients))
    result = chord(message_tasks)(callback)
    return {"success": True, "chord_id": result.id, "campaign_id": campaign_id, "recipients": len(recipients)}

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.generate_campaign_report", queue="workflows")
def generate_campaign_report(
    self, results: List[Dict], campaign_id: str, total_recipients: int,
) -> Dict[str, Any]:
    """Generate campaign report from all message results"""
    successful = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
    failed = len(results) - successful
    report = {
        "campaign_id": campaign_id,
        "timestamp": datetime.utcnow().isoformat(),
        "summary": {
            "total": total_recipients, "successful": successful, "failed": failed,
            "success_rate": round(successful / total_recipients * 100, 2) if total_recipients else 0,
        },
    }
    logger.info(f"Campaign report generated: {report['summary']}")
    return report

# ============================================================================
# Complex Workflow
# ============================================================================

@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.escalation_workflow", queue="workflows")
def escalation_workflow(self, alert_id: str, escalation_levels: List[Dict]) -> Dict[str, Any]:
    """Multi-level escalation workflow with delays between levels"""
    tasks = []
    for i, level in enumerate(escalation_levels):
        task = send_single_message.si(
            phone_number=level["phone_number"], template_id=level["template_id"],
            variables={"alert_id": alert_id, "level": i + 1, **level.get("variables", {})},
        )
        if i > 0:
            task = task.set(countdown=level.get("delay_seconds", 300))
        tasks.append(task)

    workflow = chain(*tasks)
    result = workflow.apply_async()
    return {"success": True, "workflow_id": result.id, "alert_id": alert_id, "levels": len(escalation_levels)}

# ... additional workflow implementations (ab_test_campaign, broadcast_to_segments, etc.)

Testing

# tests/conftest.py
import pytest
import os

os.environ["CELERY_BROKER_URL"] = "memory://"
os.environ["CELERY_RESULT_BACKEND"] = "cache+memory://"

@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": "memory://", "result_backend": "cache+memory://",
        "task_always_eager": True, "task_serializer": "json",
    }

@pytest.fixture
def mock_sent_client(monkeypatch):
    """Mock SentDM client"""
    class MockResult:
        def __init__(self): self.data = type("Data", (), {"id": "msg_123"})()

    class MockSentClient:
        def __init__(self): self.calls = []
        def send(self, **kwargs):
            self.calls.append(kwargs)
            return MockResult()

    mock = MockSentClient()
    monkeypatch.setenv("SENT_DM_API_KEY", "test_key")
    monkeypatch.setattr("sent_dm.SentDm", lambda *a, **k: mock)
    return mock
# tests/test_tasks.py
import pytest
from tasks.messages import send_single_message
from tasks.bulk import validate_contacts
from celery_app.exceptions import ValidationError

class TestSendSingleMessage:
    def test_successful_send(self, celery_app, mock_sent_client):
        result = send_single_message.run(
            phone_number="+1234567890", template_id="welcome", variables={"name": "John"}
        )
        assert result["success"] is True
        assert result["phone_number"] == "+1234567890"

    def test_invalid_phone_format(self, celery_app):
        with pytest.raises(ValidationError) as exc_info:
            send_single_message.run(phone_number="1234567890", template_id="welcome")
        assert "Invalid phone number format" in str(exc_info.value)

class TestBulkOperations:
    def test_validate_contacts(self, celery_app):
        contacts = [
            {"phone_number": "+1234567890", "name": "John"},
            {"phone_number": "invalid", "name": "Jane"},
        ]
        result = validate_contacts.run(contacts)
        assert len(result["valid"]) == 1
        assert len(result["invalid"]) == 1

# ... additional test classes (TestRateLimiter, TestWorkflows, etc.)

Docker Compose

# docker-compose.yml
version: "3.8"

services:
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    volumes: [redis_data:/data]

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: celery
      POSTGRES_PASSWORD: celery
      POSTGRES_DB: celery_tasks
    ports: ["5432:5432"]
    volumes: [postgres_data:/var/lib/postgresql/data]

  flower:
    build: .
    command: celery -A celery_app.app flower --port=5555 --basic_auth=admin:admin
    ports: ["5555:5555"]
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0

  worker:
    build: .
    command: celery -A celery_app.app worker --loglevel=info --queues=messages,bulk,workflows
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/1
      DATABASE_URL: postgresql://celery:celery@postgres:5432/celery_tasks
      SENT_DM_API_KEY: ${SENT_DM_API_KEY}
    depends_on: [redis, postgres]

  beat:
    build: .
    command: celery -A celery_app.app beat --loglevel=info
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
    depends_on: [redis, postgres]

volumes:
  redis_data:
  postgres_data:

Environment Variables

# Required
SENT_DM_API_KEY=your_api_key_here

# Broker & Backend
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1

# Database
DATABASE_URL=postgresql://user:pass@localhost/celery_tasks

# Environment
CELERY_ENV=development  # or production

Running Celery

# Development
redis-server
celery -A celery_app.app worker --loglevel=info --queues=messages,bulk,default
celery -A celery_app.app beat --loglevel=info

# Production (Docker)
docker-compose up -d
docker-compose logs -f worker

Usage Examples

from tasks.messages import send_single_message
from tasks.workflows import welcome_sequence, campaign_with_report
from datetime import datetime, timedelta

# Basic task
task = send_single_message.delay(
    phone_number="+1234567890", template_id="welcome", variables={"name": "John"}
)

# Workflow
workflow = welcome_sequence.delay(phone_number="+1234567890", user_name="John")

# Scheduled task
send_single_message.apply_async(
    kwargs={"phone_number": "+1234567890", "template_id": "reminder"},
    eta=datetime(2024, 6, 15, 9, 0),
)

# With retry policy
send_single_message.apply_async(
    kwargs={"phone_number": "+1234567890", "template_id": "important"},
    retry=True,
    retry_policy={"max_retries": 5, "interval_start": 60, "interval_max": 600},
)

Next Steps

On this page