Python SDK
Celery Integration
Complete Celery integration with task base classes, retry strategies, workflow orchestration, monitoring, and database integration for production workloads.
This guide covers Celery 5.3+ best practices with Redis/RabbitMQ broker, result backend, task signals, and Flower monitoring.
Project Structure
project/
├── celery_app/
│ ├── __init__.py
│ ├── app.py # Celery app configuration
│ ├── config.py # Configuration classes
│ ├── base_task.py # Base task with retry logic
│ ├── rate_limiter.py # Token bucket rate limiting
│ └── exceptions.py # Custom exceptions
├── tasks/
│ ├── __init__.py
│ ├── messages.py # Message sending tasks
│ ├── bulk.py # Bulk operation tasks
│ └── workflows.py # Canvas workflow definitions
├── models/
│ └── message_log.py # Database models
├── tests/
│ ├── conftest.py # pytest fixtures
│ └── test_tasks.py # Task tests
├── docker-compose.yml # Stack configuration
└── requirements.txtInstallation
pip install celery[redis] sentdm sqlalchemy psycopg2-binary flower prometheus-clientConfiguration
Celery Configuration
# celery_app/config.py
import os
from typing import Dict, Any
class CeleryConfig:
"""Base Celery configuration"""
broker_url: str = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
broker_connection_retry_on_startup: bool = True
result_backend: str = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
result_expires: int = 3600 * 24
task_serializer: str = "json"
accept_content: list = ["json"]
task_track_started: bool = True
task_time_limit: int = 300
task_soft_time_limit: int = 240
task_acks_late: bool = True
worker_prefetch_multiplier: int = 1
worker_max_tasks_per_child: int = 1000
task_routes: Dict[str, str] = {
"tasks.messages.*": {"queue": "messages"},
"tasks.bulk.*": {"queue": "bulk"},
"tasks.workflows.*": {"queue": "workflows"},
}
beat_schedule: Dict[str, Any] = {
"cleanup-old-logs": {
"task": "tasks.scheduled.cleanup_message_logs",
"schedule": 3600 * 24,
"args": (30,),
},
"retry-failed-messages": {
"task": "tasks.scheduled.retry_failed_messages",
"schedule": 300,
},
}
database_url: str = os.getenv("DATABASE_URL", "postgresql://localhost/celery_tasks")
sent_dm_api_key: str = os.getenv("SENT_DM_API_KEY", "")
class ProductionConfig(CeleryConfig):
"""Production configuration"""
broker_transport_options: Dict[str, Any] = {
"visibility_timeout": 43200,
"queue_order_strategy": "priority",
}
worker_enable_remote_control: bool = True
worker_send_task_events: bool = True
task_send_sent_event: bool = True
def get_config():
env = os.getenv("CELERY_ENV", "development")
return ProductionConfig() if env == "production" else CeleryConfig()Celery App Factory
# celery_app/app.py
from celery import Celery
from .config import get_config
def create_celery_app(app_name: str = "sent_dm_tasks") -> Celery:
"""Create and configure Celery application"""
config = get_config()
app = Celery(app_name)
app.config_from_object(config)
app.autodiscover_tasks([
"tasks.messages",
"tasks.bulk",
"tasks.workflows",
])
return app
app = create_celery_app()Custom Exceptions
# celery_app/exceptions.py
from typing import Optional, Dict, Any
class SentDMTaskError(Exception):
"""Base exception for SentDM Celery tasks"""
def __init__(self, message: str, error_code: Optional[str] = None,
details: Optional[Dict] = None, retryable: bool = False):
super().__init__(message)
self.message = message
self.error_code = error_code or "TASK_ERROR"
self.details = details or {}
self.retryable = retryable
class NonRetryableError(SentDMTaskError):
"""Error that should not be retried (4xx errors)"""
def __init__(self, message: str, error_code: str = "NON_RETRYABLE", details=None):
super().__init__(message, error_code, details, retryable=False)
class RateLimitExceeded(SentDMTaskError):
"""Rate limit exceeded - retry with backoff"""
def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60):
super().__init__(message, "RATE_LIMIT_EXCEEDED", {"retry_after": retry_after}, True)
self.retry_after = retry_after
class TemporaryError(SentDMTaskError):
"""Temporary error - should be retried"""
def __init__(self, message: str, error_code: str = "TEMPORARY_ERROR", details=None):
super().__init__(message, error_code, details, retryable=True)Token Bucket Rate Limiter
# celery_app/rate_limiter.py
import time
import threading
from typing import Optional, Dict
from dataclasses import dataclass
@dataclass
class TokenBucket:
"""Token bucket for rate limiting"""
capacity: int
tokens: float
fill_rate: float
last_update: float
def consume(self, tokens: int = 1) -> bool:
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.fill_rate)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
class RateLimiter:
"""Thread-safe rate limiter with multiple buckets"""
def __init__(self):
self._buckets: Dict[str, TokenBucket] = {}
self._lock = threading.Lock()
def acquire(self, key: str, capacity: int, fill_rate: float,
tokens: int = 1, timeout: Optional[float] = None) -> bool:
bucket = self._buckets.get(key)
if not bucket:
bucket = TokenBucket(capacity, capacity, fill_rate, time.time())
self._buckets[key] = bucket
start_time = time.time()
while True:
with self._lock:
if bucket.consume(tokens):
return True
wait_time = bucket.get_wait_time(tokens)
if timeout and (time.time() - start_time + wait_time > timeout):
return False
if wait_time > 0:
time.sleep(min(wait_time, 0.1))
rate_limiter = RateLimiter()Base Task Class
# celery_app/base_task.py
import logging
from celery import Task
from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded
from .exceptions import NonRetryableError, RateLimitExceeded, TemporaryError
from .rate_limiter import rate_limiter
logger = logging.getLogger(__name__)
class SentDMBaseTask(Task):
"""Base task class with retry logic, rate limiting, and error handling"""
abstract = True
max_retries = 3
default_retry_delay = 60
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
rate_limit_key: Optional[str] = None
rate_limit_capacity: int = 100
rate_limit_fill_rate: float = 10.0
soft_time_limit = 240
time_limit = 300
def __init__(self):
self._sent_client = None
@property
def sent_client(self):
"""Lazy initialization of SentDM client"""
if self._sent_client is None:
from sent_dm import SentDm
import os
api_key = os.getenv("SENT_DM_API_KEY")
if not api_key:
raise NonRetryableError("SENT_DM_API_KEY not configured")
self._sent_client = SentDm(api_key)
return self._sent_client
def apply_rate_limit(self, tokens: int = 1) -> None:
if not self.rate_limit_key:
return
acquired = rate_limiter.acquire(
key=self.rate_limit_key,
capacity=self.rate_limit_capacity,
fill_rate=self.rate_limit_fill_rate,
tokens=tokens,
timeout=30,
)
if not acquired:
raise RateLimitExceeded(retry_after=int(1 / self.rate_limit_fill_rate))
def call(self, *args, **kwargs):
self.apply_rate_limit()
return super().call(*args, **kwargs)
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"Task {task_id} retry {self.request.retries}/{self.max_retries}: {exc}")
super().on_retry(exc, task_id, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f"Task {task_id} failed: {exc}", extra={
"task_id": task_id, "exception": str(exc),
})
super().on_failure(exc, task_id, args, kwargs, einfo)
def on_success(self, retval, task_id, args, kwargs):
logger.info(f"Task {task_id} completed successfully")
super().on_success(retval, task_id, args, kwargs)Message Tasks
# tasks/messages.py
import logging
from typing import Optional, Dict, Any
from datetime import datetime
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery_app.base_task import SentDMBaseTask
from celery_app.exceptions import NonRetryableError, RateLimitExceeded, TemporaryError, ValidationError
logger = logging.getLogger(__name__)
@shared_task(
base=SentDMBaseTask, bind=True, name="tasks.messages.send_single_message",
queue="messages", max_retries=3, default_retry_delay=60,
)
def send_single_message(
self, phone_number: str, template_id: str,
variables: Optional[Dict[str, Any]] = None,
channels: Optional[list] = None, track_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Send a single message with full retry logic and error handling"""
# Validate inputs
if not phone_number or not phone_number.startswith("+"):
raise ValidationError("Invalid phone number format. Must be E.164")
try:
result = self.sent_client.messages.send(
phone_number=phone_number, template_id=template_id,
variables=variables or {}, channels=channels,
)
return {
"success": True,
"message_id": result.data.id if hasattr(result, "data") else None,
"status": "sent", "track_id": track_id, "phone_number": phone_number,
}
except SoftTimeLimitExceeded:
raise TemporaryError("Task timed out", error_code="TIMEOUT")
except Exception as exc:
error_msg = str(exc)
if "rate limit" in error_msg.lower() or "429" in error_msg:
retry_after = 60 * (2 ** self.request.retries)
raise RateLimitExceeded(retry_after=retry_after)
elif "invalid" in error_msg.lower() or "400" in error_msg:
raise NonRetryableError(f"Invalid request: {error_msg}", "INVALID_REQUEST")
elif self.request.retries < self.max_retries:
raise self.retry(exc=exc)
else:
raise
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.messages.send_templated_batch", queue="messages")
def send_templated_batch(
self, recipients: list, template_id: str, batch_variables: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Send messages to multiple recipients"""
results = {"total": len(recipients), "queued": 0, "failed": 0, "errors": []}
for recipient in recipients:
try:
send_single_message.delay(
phone_number=recipient.get("phone_number"),
template_id=template_id,
variables={**(batch_variables or {}), **recipient.get("variables", {})},
)
results["queued"] += 1
except Exception as e:
results["failed"] += 1
results["errors"].append({"recipient": recipient.get("phone_number"), "error": str(e)})
return results
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.messages.retry_failed_message", queue="messages")
def retry_failed_message(self, message_log_id: str) -> Dict[str, Any]:
"""Retry a previously failed message by log ID"""
# Implementation for retrying failed messages
pass # Add your implementationBulk Operation Tasks
# tasks/bulk.py
import logging
from typing import List, Dict, Any, Optional
from celery import shared_task, group, chord
from celery_app.base_task import SentDMBaseTask
from tasks.messages import send_single_message
logger = logging.getLogger(__name__)
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.bulk.send_bulk_messages", queue="bulk")
def send_bulk_messages(
self, recipients: List[Dict], template_id: str,
variables: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Send messages to multiple users using groups for parallel processing"""
if not recipients:
return {"success": True, "queued": 0}
signatures = [
send_single_message.s(
phone_number=r["phone_number"],
template_id=template_id,
variables={**(variables or {}), **r.get("variables", {})},
)
for r in recipients
]
job = group(signatures)
result = job.apply_async()
return {
"success": True, "group_id": result.id,
"total_tasks": len(signatures),
}
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.bulk.validate_contacts", queue="bulk")
def validate_contacts(self, contacts: List[Dict]) -> Dict[str, List]:
"""Validate and filter contacts"""
import re
valid, invalid = [], []
phone_pattern = re.compile(r"^\+[1-9]\d{1,14}$")
for contact in contacts:
phone = contact.get("phone_number", "").strip()
if phone_pattern.match(phone):
valid.append({**contact, "phone_number": phone})
else:
invalid.append({"contact": contact, "reason": "Invalid phone format"})
return {"valid": valid, "invalid": invalid}
# ... additional bulk task implementationsCanvas Workflows
# tasks/workflows.py
"""
Celery Canvas workflow examples:
- chain: Sequential execution (A → B → C)
- group: Parallel execution ([A, B, C])
- chord: Group with callback ([A, B, C] → D)
"""
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from celery import shared_task, chain, group, chord
from celery_app.base_task import SentDMBaseTask
from tasks.messages import send_single_message
logger = logging.getLogger(__name__)
# ============================================================================
# Chain Workflow (Sequential)
# ============================================================================
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.welcome_sequence", queue="workflows")
def welcome_sequence(self, phone_number: str, user_name: str) -> Dict[str, Any]:
"""Multi-step welcome sequence using chain"""
workflow = chain(
send_single_message.s(
phone_number=phone_number, template_id="welcome", variables={"name": user_name}
),
send_single_message.si(
phone_number=phone_number, template_id="onboarding-tips", variables={"name": user_name}
).set(countdown=3600), # 1 hour delay
send_single_message.si(
phone_number=phone_number, template_id="feature-highlight", variables={"name": user_name}
).set(countdown=86400), # 24 hour delay
)
result = workflow.apply_async()
return {
"success": True, "workflow_id": result.id,
"sequence": ["welcome", "onboarding-tips", "feature-highlight"],
}
# ============================================================================
# Group Workflow (Parallel)
# ============================================================================
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.multi_channel_notify", queue="workflows")
def multi_channel_notify(self, phone_number: str, message_content: Dict) -> Dict[str, Any]:
"""Send same message via multiple channels in parallel"""
channels = ["whatsapp", "sms"]
tasks = [
send_single_message.s(
phone_number=phone_number, template_id=message_content["template_id"],
variables=message_content.get("variables", {}), channels=[channel],
)
for channel in channels
]
result = group(tasks).apply_async()
return {"success": True, "group_id": result.id, "channels": channels}
# ============================================================================
# Chord Workflow (Parallel + Callback)
# ============================================================================
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.campaign_with_report", queue="workflows")
def campaign_with_report(
self, recipients: List[Dict], template_id: str, campaign_id: str,
) -> Dict[str, Any]:
"""Send campaign messages and generate report when complete"""
message_tasks = [
send_single_message.s(
phone_number=r["phone_number"], template_id=template_id, variables=r.get("variables", {})
)
for r in recipients
]
callback = generate_campaign_report.s(campaign_id=campaign_id, total_recipients=len(recipients))
result = chord(message_tasks)(callback)
return {"success": True, "chord_id": result.id, "campaign_id": campaign_id, "recipients": len(recipients)}
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.generate_campaign_report", queue="workflows")
def generate_campaign_report(
self, results: List[Dict], campaign_id: str, total_recipients: int,
) -> Dict[str, Any]:
"""Generate campaign report from all message results"""
successful = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
failed = len(results) - successful
report = {
"campaign_id": campaign_id,
"timestamp": datetime.utcnow().isoformat(),
"summary": {
"total": total_recipients, "successful": successful, "failed": failed,
"success_rate": round(successful / total_recipients * 100, 2) if total_recipients else 0,
},
}
logger.info(f"Campaign report generated: {report['summary']}")
return report
# ============================================================================
# Complex Workflow
# ============================================================================
@shared_task(base=SentDMBaseTask, bind=True, name="tasks.workflows.escalation_workflow", queue="workflows")
def escalation_workflow(self, alert_id: str, escalation_levels: List[Dict]) -> Dict[str, Any]:
"""Multi-level escalation workflow with delays between levels"""
tasks = []
for i, level in enumerate(escalation_levels):
task = send_single_message.si(
phone_number=level["phone_number"], template_id=level["template_id"],
variables={"alert_id": alert_id, "level": i + 1, **level.get("variables", {})},
)
if i > 0:
task = task.set(countdown=level.get("delay_seconds", 300))
tasks.append(task)
workflow = chain(*tasks)
result = workflow.apply_async()
return {"success": True, "workflow_id": result.id, "alert_id": alert_id, "levels": len(escalation_levels)}
# ... additional workflow implementations (ab_test_campaign, broadcast_to_segments, etc.)Testing
# tests/conftest.py
import pytest
import os
os.environ["CELERY_BROKER_URL"] = "memory://"
os.environ["CELERY_RESULT_BACKEND"] = "cache+memory://"
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://", "result_backend": "cache+memory://",
"task_always_eager": True, "task_serializer": "json",
}
@pytest.fixture
def mock_sent_client(monkeypatch):
"""Mock SentDM client"""
class MockResult:
def __init__(self): self.data = type("Data", (), {"id": "msg_123"})()
class MockSentClient:
def __init__(self): self.calls = []
def send(self, **kwargs):
self.calls.append(kwargs)
return MockResult()
mock = MockSentClient()
monkeypatch.setenv("SENT_DM_API_KEY", "test_key")
monkeypatch.setattr("sent_dm.SentDm", lambda *a, **k: mock)
return mock# tests/test_tasks.py
import pytest
from tasks.messages import send_single_message
from tasks.bulk import validate_contacts
from celery_app.exceptions import ValidationError
class TestSendSingleMessage:
def test_successful_send(self, celery_app, mock_sent_client):
result = send_single_message.run(
phone_number="+1234567890", template_id="welcome", variables={"name": "John"}
)
assert result["success"] is True
assert result["phone_number"] == "+1234567890"
def test_invalid_phone_format(self, celery_app):
with pytest.raises(ValidationError) as exc_info:
send_single_message.run(phone_number="1234567890", template_id="welcome")
assert "Invalid phone number format" in str(exc_info.value)
class TestBulkOperations:
def test_validate_contacts(self, celery_app):
contacts = [
{"phone_number": "+1234567890", "name": "John"},
{"phone_number": "invalid", "name": "Jane"},
]
result = validate_contacts.run(contacts)
assert len(result["valid"]) == 1
assert len(result["invalid"]) == 1
# ... additional test classes (TestRateLimiter, TestWorkflows, etc.)Docker Compose
# docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
volumes: [redis_data:/data]
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: celery
POSTGRES_PASSWORD: celery
POSTGRES_DB: celery_tasks
ports: ["5432:5432"]
volumes: [postgres_data:/var/lib/postgresql/data]
flower:
build: .
command: celery -A celery_app.app flower --port=5555 --basic_auth=admin:admin
ports: ["5555:5555"]
environment:
CELERY_BROKER_URL: redis://redis:6379/0
worker:
build: .
command: celery -A celery_app.app worker --loglevel=info --queues=messages,bulk,workflows
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/1
DATABASE_URL: postgresql://celery:celery@postgres:5432/celery_tasks
SENT_DM_API_KEY: ${SENT_DM_API_KEY}
depends_on: [redis, postgres]
beat:
build: .
command: celery -A celery_app.app beat --loglevel=info
environment:
CELERY_BROKER_URL: redis://redis:6379/0
depends_on: [redis, postgres]
volumes:
redis_data:
postgres_data:Environment Variables
# Required
SENT_DM_API_KEY=your_api_key_here
# Broker & Backend
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# Database
DATABASE_URL=postgresql://user:pass@localhost/celery_tasks
# Environment
CELERY_ENV=development # or productionRunning Celery
# Development
redis-server
celery -A celery_app.app worker --loglevel=info --queues=messages,bulk,default
celery -A celery_app.app beat --loglevel=info
# Production (Docker)
docker-compose up -d
docker-compose logs -f workerUsage Examples
from tasks.messages import send_single_message
from tasks.workflows import welcome_sequence, campaign_with_report
from datetime import datetime, timedelta
# Basic task
task = send_single_message.delay(
phone_number="+1234567890", template_id="welcome", variables={"name": "John"}
)
# Workflow
workflow = welcome_sequence.delay(phone_number="+1234567890", user_name="John")
# Scheduled task
send_single_message.apply_async(
kwargs={"phone_number": "+1234567890", "template_id": "reminder"},
eta=datetime(2024, 6, 15, 9, 0),
)
# With retry policy
send_single_message.apply_async(
kwargs={"phone_number": "+1234567890", "template_id": "important"},
retry=True,
retry_policy={"max_retries": 5, "interval_start": 60, "interval_max": 600},
)Next Steps
- Learn about best practices for production deployments
- Set up webhooks for delivery status updates
- Explore the Python SDK reference for advanced features
- Explore testing strategies for background task validation