Django 와 Celery 를 이용한 대규모 작업 분산처리

📋 이 글에서 다루는 내용

  • Celery 기본 개념: 분산 작업 큐 시스템의 아키텍처와 구성 요소
  • 도입 시점 결정: 언제 Celery + AMQP를 사용해야 하는지 판단하는 8가지 체크리스트
  • Django 프로젝트 설정: Redis/RabbitMQ 브로커 설정과 Celery 앱 구성
  • 작업 분산 패턴: Chunking, Chain, Chord, 우선순위 큐 활용법
  • 실전 예제: 100만 명에게 푸시 알림을 발송하는 대규모 시스템 구현
  • 프로덕션 배포: Supervisor, 모니터링, 로깅, 보안 설정
  • 성능 최적화: Database 최적화, 메모리 관리, Rate Limiting
예상 소요 시간: 30-40분 난이도: 중급 실습 포함:

Introduction

웹 애플리케이션을 개발하다 보면 시간이 오래 걸리는 작업들을 처리해야 하는 경우가 있습니다. 예를 들어, 대용량 파일 처리, 이메일 발송, 데이터 분석, 리포트 생성 등의 작업은 사용자의 요청에 대한 응답 시간을 지연시킬 수 있습니다.

Celery는 Python 기반의 분산 작업 큐 시스템으로, Django와 함께 사용하여 이러한 무거운 작업을 백그라운드에서 비동기적으로 처리할 수 있게 해줍니다. 이 글에서는 Django와 Celery를 이용한 대규모 작업 분산처리 방법을 살펴보겠습니다.

Celery 란?

Celery는 분산 메시지 전달을 기반으로 동작하는 비동기 작업 큐입니다. 주요 특징은 다음과 같습니다:

  • 비동기 처리: 시간이 오래 걸리는 작업을 백그라운드에서 처리
  • 분산 처리: 여러 워커(worker)를 통해 작업을 분산 실행
  • 스케줄링: 주기적인 작업(cron-like) 실행 지원
  • 재시도 및 에러 처리: 실패한 작업에 대한 재시도 메커니즘
  • 모니터링: Flower 등을 통한 실시간 작업 모니터링

Celery 아키텍처

graph LR
    A[Django App<br/>Producer] -->|1. Task 전송| B[Message Broker<br/>RabbitMQ/Redis]
    B -->|2. Task 전달| C[Worker<br/>Process]
    C -->|3. 작업 처리| D[Task Execution]
    D -->|4. 결과 저장| E[Result Backend<br/>Optional]
    E -.->|5. 결과 조회| A

    style A fill:#e1f5ff
    style B fill:#fff4e1
    style C fill:#e8f5e9
    style D fill:#f3e5f5
    style E fill:#fce4ec

주요 구성 요소:

  • Producer: 작업을 생성하고 메시지 브로커로 전송 (Django 앱)
  • Message Broker: 작업 메시지를 큐에 저장 (RabbitMQ, Redis 등)
  • Worker: 큐에서 작업을 가져와 실행
  • Result Backend: 작업 결과를 저장 (선택적)

Celery + AMQP 도입을 고려해야 하는 시점

Celery와 AMQP (RabbitMQ) 조합은 강력하지만, 모든 프로젝트에 필요한 것은 아닙니다. 다음 상황에서 도입을 고려해야 합니다.

도입해야 하는 경우

1. 사용자 요청 응답 시간이 중요한 경우

# 문제 상황: 동기 처리로 인한 느린 응답
def send_welcome_email(request):
    user = request.user
    # 이메일 발송에 3-5초 소요
    send_mail(
        'Welcome!',
        'Thank you for signing up.',
        'from@example.com',
        [user.email],
    )
    return JsonResponse({'status': 'ok'})  # 사용자는 5초 대기

# 해결: Celery로 비동기 처리
@shared_task
def send_welcome_email_task(user_id):
    user = User.objects.get(id=user_id)
    send_mail(...)

def send_welcome_email(request):
    send_welcome_email_task.delay(request.user.id)
    return JsonResponse({'status': 'ok'})  # 즉시 응답 (< 100ms)

적용 기준:

  • HTTP 요청 처리 시간이 500ms를 초과하는 경우
  • 사용자가 즉각적인 피드백을 기대하는 작업
  • 타임아웃으로 인한 요청 실패가 발생하는 경우

2. 대량의 작업을 병렬로 처리해야 하는 경우

# 시나리오: 1만 개의 이미지 리사이징
images = Image.objects.filter(status='pending')  # 10,000개

# 동기 처리: 10,000 × 2초 = 약 5.5시간
for image in images:
    resize_image(image)

# Celery 병렬 처리: 20 워커 사용 시 약 16분
for image in images:
    resize_image_task.delay(image.id)

적용 기준:

  • 동일한 작업을 수천~수만 번 반복해야 하는 경우
  • 작업 간 의존성이 없어 병렬 처리가 가능한 경우
  • 처리 시간을 단축해야 하는 비즈니스 요구사항이 있는 경우

3. 외부 API 호출이 빈번한 경우

# 문제: 외부 API 호출로 인한 블로킹
def process_payment(request):
    # 결제 게이트웨이 API 호출 (2-3초)
    payment_result = stripe.charge.create(...)

    # SMS 발송 API 호출 (1-2초)
    send_sms(user.phone, "Payment confirmed")

    # 회계 시스템 API 호출 (1-2초)
    accounting_system.record_transaction(...)

    return response  # 총 4-7초 소요

# 해결: API 호출을 비동기로 처리
@shared_task
def process_payment_async(payment_id):
    payment_result = stripe.charge.create(...)
    send_sms_task.delay(...)
    record_transaction_task.delay(...)

적용 기준:

  • 외부 API 응답 시간이 1초 이상인 경우
  • API 타임아웃이나 장애가 사용자 경험에 영향을 주는 경우
  • Rate limit으로 인해 요청 속도 조절이 필요한 경우

4. 주기적으로 실행해야 하는 작업이 있는 경우

# Celery Beat으로 스케줄링
CELERY_BEAT_SCHEDULE = {
    'cleanup-old-sessions': {
        'task': 'myapp.tasks.cleanup_old_sessions',
        'schedule': crontab(hour=3, minute=0),  # 매일 새벽 3시
    },
    'send-daily-report': {
        'task': 'myapp.tasks.send_daily_report',
        'schedule': crontab(hour=9, minute=0, day_of_week='1-5'),  # 평일 9시
    },
    'update-cache': {
        'task': 'myapp.tasks.update_cache',
        'schedule': 300.0,  # 5분마다
    },
}

적용 기준:

  • cron 작업을 Python 코드로 관리하고 싶은 경우
  • 작업 실행 이력과 결과를 추적해야 하는 경우
  • 작업 실패 시 재시도나 알림이 필요한 경우

5. 메시지 유실을 방지해야 하는 중요한 작업

AMQP (RabbitMQ)는 메시지 영속성과 안정성이 뛰어나므로 중요한 작업에 적합합니다.

# RabbitMQ 설정: 메시지 영속성 보장
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_TASK_ACKS_LATE = True  # 작업 완료 후 ACK
CELERY_TASK_REJECT_ON_WORKER_LOST = True  # 워커 장애 시 재실행

@shared_task(bind=True, max_retries=5)
def process_payment(self, payment_id):
    try:
        # 중요한 결제 처리
        payment = Payment.objects.get(id=payment_id)
        result = gateway.process(payment)
        payment.status = 'completed'
        payment.save()
    except Exception as exc:
        # 재시도
        raise self.retry(exc=exc, countdown=60)

적용 기준:

  • 금융 거래, 주문 처리 등 데이터 무결성이 중요한 경우
  • 작업 유실이 비즈니스에 심각한 영향을 미치는 경우
  • 워커 장애 시에도 작업이 재실행되어야 하는 경우

6. 우선순위가 다른 작업들을 처리해야 하는 경우

# 우선순위별 큐 분리
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_urgent_alert': {'queue': 'critical'},
    'myapp.tasks.process_order': {'queue': 'high'},
    'myapp.tasks.generate_report': {'queue': 'normal'},
    'myapp.tasks.cleanup_data': {'queue': 'low'},
}

# 워커를 우선순위별로 실행
# Critical: 즉시 처리 (concurrency=5)
celery -A myproject worker -Q critical --concurrency=5

# High: 빠른 처리 (concurrency=10)
celery -A myproject worker -Q high --concurrency=10

# Normal/Low: 여유 있을 때 처리 (concurrency=3)
celery -A myproject worker -Q normal,low --concurrency=3

적용 기준:

  • 긴급한 작업(알림)과 일반 작업(리포트)을 구분해야 하는 경우
  • 중요도에 따라 리소스를 차등 배분해야 하는 경우
  • 특정 작업이 다른 작업을 블로킹하지 않아야 하는 경우

도입하지 않아도 되는 경우

1. 작업이 간단하고 빠른 경우

  • 모든 작업이 100ms 이내에 완료되는 경우
  • 동기 처리로도 충분한 성능을 보이는 경우

2. 트래픽이 매우 낮은 경우

  • 일일 사용자가 수십 명 이하인 내부 도구
  • MVP 단계의 프로토타입 개발

3. 인프라 관리 부담이 큰 경우

  • 소규모 팀에서 추가 서버 관리가 어려운 경우
  • 클라우드 비용을 최소화해야 하는 경우
  • 대안: Django-Q (단순한 작업 큐), Huey (경량 작업 큐)

4. 실시간 결과가 필요한 경우

  • 사용자가 작업 결과를 즉시 확인해야 하는 경우
  • 작업 완료 전까지 다음 단계로 진행할 수 없는 경우
  • 대안: WebSocket, Server-Sent Events로 실시간 업데이트

Redis vs RabbitMQ 선택 기준

Redis를 선택해야 하는 경우:

  • 간단한 작업 큐가 필요한 경우
  • 높은 처리 속도가 중요한 경우
  • 작업 유실이 치명적이지 않은 경우
  • Redis를 이미 캐시로 사용하고 있는 경우
# Redis 설정: 간단하고 빠름
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

RabbitMQ (AMQP)를 선택해야 하는 경우:

  • 메시지 영속성이 중요한 경우 (결제, 주문 등)
  • 복잡한 라우팅이 필요한 경우
  • 높은 안정성과 신뢰성이 요구되는 경우
  • 대규모 분산 시스템을 구축하는 경우
# RabbitMQ 설정: 안정적이고 영속적
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_TASK_ACKS_LATE = True

단계적 도입 전략

Phase 1: 작은 규모로 시작

# 가장 시간이 오래 걸리는 1-2개 작업만 먼저 비동기로 전환
@shared_task
def send_email_task(subject, body, recipient):
    send_mail(subject, body, 'from@example.com', [recipient])

# 기존 코드 수정 최소화
def signup_view(request):
    # ... 회원가입 로직 ...
    send_email_task.delay('Welcome', 'Thank you', user.email)

Phase 2: 모니터링 및 최적화

# Flower로 작업 모니터링
celery -A myproject flower

# 성능 지표 확인
# - 작업 처리 속도
# - 실패율
# - 대기 큐 길이

Phase 3: 확장

# 더 많은 작업을 비동기로 전환
# 우선순위 큐 도입
# 워커 수 증가
# 모니터링 및 알림 체계 구축

비용 vs 효과 분석

도입 비용:

  • 인프라 비용: 메시지 브로커 서버, 워커 서버 (월 $50-200)
  • 개발 시간: 초기 설정 및 학습 (1-2주)
  • 운영 부담: 모니터링, 장애 대응 (주당 2-3시간)

기대 효과:

  • 응답 시간 개선: 2-5초 → 100-200ms (10-50배 향상)
  • 처리량 증가: 순차 처리 → 병렬 처리 (워커 수만큼 배수 증가)
  • 사용자 경험 향상: 즉각적인 피드백, 타임아웃 제거
  • 시스템 안정성: 작업 격리, 에러 재시도, 부하 분산

의사결정 체크리스트

다음 질문에 3개 이상 “예”라면 Celery + AMQP 도입을 고려하세요:

  • HTTP 요청 처리 시간이 1초를 초과하는 경우가 있는가?
  • 하루에 1,000개 이상의 백그라운드 작업이 필요한가?
  • 외부 API 호출이 전체 로직의 30% 이상을 차지하는가?
  • 주기적으로 실행해야 하는 cron 작업이 5개 이상인가?
  • 작업 실패 시 자동 재시도가 필요한가?
  • 작업 우선순위를 구분해야 하는가?
  • 팀에 인프라 관리 역량이 있는가?
  • 서비스 확장 계획이 있는가?

Django 프로젝트에 Celery 설정하기

1. 필요한 패키지 설치

pip install celery
pip install redis  # Redis를 브로커로 사용하는 경우
# 또는
pip install amqp  # RabbitMQ를 브로커로 사용하는 경우

2. Celery 설정 파일 생성

프로젝트 구조:

myproject/
├── myproject/
│   ├── __init__.py
│   ├── settings.py
│   ├── celery.py  # 새로 생성
│   └── urls.py
└── manage.py

myproject/celery.py:

import os
from celery import Celery

# Django 설정 모듈 지정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Django settings에서 CELERY_ 접두사를 가진 설정 로드
app.config_from_object('django.conf:settings', namespace='CELERY')

# 등록된 Django 앱에서 tasks.py를 자동으로 찾아 로드
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

myproject/init.py:

# Celery 앱이 Django와 함께 로드되도록 설정
from .celery import app as celery_app

__all__ = ('celery_app',)

3. Django settings.py 설정

# settings.py

# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 사용 시
# 또는
# CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'  # RabbitMQ 사용 시

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# 작업 시간 제한 (초)
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30분
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60  # 25분

# 결과 저장 기간 (초)
CELERY_RESULT_EXPIRES = 3600  # 1시간

Celery Task 작성하기

기본 Task 예제

myapp/tasks.py:

from celery import shared_task
from django.core.mail import send_mail
from time import sleep

@shared_task
def send_email_task(subject, message, recipient_list):
    """이메일 발송 작업"""
    send_mail(
        subject,
        message,
        'from@example.com',
        recipient_list,
        fail_silently=False,
    )
    return f'Email sent to {len(recipient_list)} recipients'

@shared_task
def process_large_file(file_path):
    """대용량 파일 처리 작업"""
    # 파일 처리 로직
    sleep(10)  # 시뮬레이션
    return f'File {file_path} processed successfully'

@shared_task(bind=True, max_retries=3)
def retry_task(self, x, y):
    """재시도 가능한 작업"""
    try:
        result = x / y
        return result
    except ZeroDivisionError as exc:
        # 10초 후 재시도
        raise self.retry(exc=exc, countdown=10)

Django View에서 Task 호출

# views.py
from django.http import JsonResponse
from .tasks import send_email_task, process_large_file

def send_notification(request):
    # 비동기 작업 실행
    task = send_email_task.delay(
        'Welcome!',
        'Thank you for joining.',
        ['user@example.com']
    )

    return JsonResponse({
        'status': 'Task submitted',
        'task_id': task.id
    })

def process_file(request):
    file_path = request.POST.get('file_path')

    # 즉시 응답 반환하고 백그라운드에서 처리
    task = process_large_file.delay(file_path)

    return JsonResponse({
        'status': 'Processing started',
        'task_id': task.id
    })

대규모 작업 분산처리 패턴

1. Chunking (작업 분할)

대량의 데이터를 작은 단위로 나누어 처리:

from celery import group

@shared_task
def process_batch(items):
    """배치 단위 처리"""
    results = []
    for item in items:
        # 각 아이템 처리
        result = process_single_item(item)
        results.append(result)
    return results

def process_large_dataset(dataset):
    """대규모 데이터셋을 청크로 분할하여 처리"""
    chunk_size = 100
    chunks = [dataset[i:i+chunk_size] for i in range(0, len(dataset), chunk_size)]

    # 각 청크를 병렬로 처리
    job = group(process_batch.s(chunk) for chunk in chunks)
    result = job.apply_async()

    return result

2. Chain (작업 체인)

순차적으로 실행되어야 하는 작업들:

from celery import chain

@shared_task
def fetch_data(url):
    """데이터 가져오기"""
    # API 호출 등
    return data

@shared_task
def transform_data(data):
    """데이터 변환"""
    # 데이터 처리
    return transformed_data

@shared_task
def save_data(data):
    """데이터 저장"""
    # DB 저장
    return 'Saved'

# 체인으로 연결
workflow = chain(
    fetch_data.s('https://api.example.com/data'),
    transform_data.s(),
    save_data.s()
)
result = workflow.apply_async()

3. Chord (병렬 + 집계)

여러 작업을 병렬로 실행 후 결과를 집계:

from celery import chord

@shared_task
def process_item(item_id):
    """개별 아이템 처리"""
    # 처리 로직
    return result

@shared_task
def aggregate_results(results):
    """결과 집계"""
    total = sum(results)
    # 집계 결과 저장
    return total

# Chord 패턴: 병렬 처리 후 집계
callback = aggregate_results.s()
header = [process_item.s(i) for i in range(100)]
result = chord(header)(callback)

4. 우선순위 큐

중요도에 따른 작업 처리:

graph TB
    subgraph "Django Application"
        A1[긴급 알림 작업]
        A2[일반 작업]
        A3[배치 작업]
    end

    subgraph "Message Broker - Queue System"
        B1[High Priority Queue]
        B2[Default Queue]
        B3[Low Priority Queue]
    end

    subgraph "Worker Pool"
        C1[Worker 1-10<br/>concurrency=10]
        C2[Worker 11-30<br/>concurrency=20]
        C3[Worker 31-35<br/>concurrency=5]
    end

    A1 -->|urgent_task| B1
    A2 -->|normal_task| B2
    A3 -->|batch_task| B3

    B1 -->|즉시 처리| C1
    B2 -->|빠른 처리| C2
    B3 -->|여유 있을 때| C3

    style B1 fill:#ffcdd2
    style B2 fill:#fff9c4
    style B3 fill:#c8e6c9
    style C1 fill:#ffebee
    style C2 fill:#fffde7
    style C3 fill:#e8f5e9

코드 구현:

# settings.py
CELERY_TASK_ROUTES = {
    'myapp.tasks.urgent_task': {'queue': 'high_priority'},
    'myapp.tasks.normal_task': {'queue': 'default'},
    'myapp.tasks.batch_task': {'queue': 'low_priority'},
}

# tasks.py
@shared_task
def urgent_task():
    """긴급 작업"""
    pass

# Worker 실행 시 큐 지정
# celery -A myproject worker -Q high_priority
# celery -A myproject worker -Q default,low_priority

실전 예제: 대규모 앱 푸시 알림 시스템

실제 서비스에서 수십만~수백만 명의 사용자에게 푸시 알림을 발송해야 하는 경우를 생각해봅시다. 예를 들어, 이커머스 앱에서 특가 이벤트 알림, 게임 앱에서 업데이트 공지, 뉴스 앱에서 속보 전송 등의 상황입니다.

전체 워크플로우

graph TD
    A[Admin: 캠페인 생성] -->|API 호출| B[Django View]
    B -->|execute_push_campaign.delay| C[Celery Broker]
    C -->|작업 분배| D[Main Task: execute_push_campaign]
    D -->|1M 디바이스 조회| E[Database]
    D -->|500개씩 분할| F[2,000개 배치 생성]
    F -->|Chord 패턴| G{Worker Pool<br/>20-50 workers}

    G -->|배치 1-500| H1[send_push_batch]
    G -->|배치 501-1000| H2[send_push_batch]
    G -->|배치 N| H3[send_push_batch]

    H1 -->|FCM API| I[Firebase Cloud Messaging]
    H2 -->|FCM API| I
    H3 -->|FCM API| I

    I -->|푸시 발송| J[사용자 디바이스]

    H1 -->|결과| K[aggregate_campaign_results]
    H2 -->|결과| K
    H3 -->|결과| K

    K -->|통계 업데이트| E
    K -->|완료 알림| L[관리자 알림]

    style A fill:#e3f2fd
    style B fill:#fff3e0
    style C fill:#f3e5f5
    style D fill:#e8f5e9
    style F fill:#fce4ec
    style G fill:#fff9c4
    style I fill:#ffebee
    style K fill:#e0f2f1

시나리오

요구사항:

  • 100만 명의 사용자에게 동시에 푸시 알림 발송
  • FCM (Firebase Cloud Messaging) 사용
  • 발송 성공/실패 추적
  • 우선순위에 따른 처리 (긴급 공지 vs 일반 마케팅)
  • Rate limiting (FCM API 제한 준수)
  • 재시도 로직
  • 실시간 진행 상황 모니터링

데이터 모델 설계

# models.py
from django.db import models
from django.contrib.auth.models import User

class DeviceToken(models.Model):
    """사용자 디바이스 토큰"""
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='devices')
    token = models.CharField(max_length=255, unique=True)
    platform = models.CharField(max_length=10, choices=[
        ('ios', 'iOS'),
        ('android', 'Android'),
    ])
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        indexes = [
            models.Index(fields=['is_active', 'platform']),
        ]

class PushCampaign(models.Model):
    """푸시 캠페인"""
    PRIORITY_CHOICES = [
        ('high', '긴급'),
        ('normal', '일반'),
        ('low', '마케팅'),
    ]

    STATUS_CHOICES = [
        ('draft', '준비 중'),
        ('scheduled', '예약됨'),
        ('processing', '발송 중'),
        ('completed', '완료'),
        ('failed', '실패'),
    ]

    title = models.CharField(max_length=100)
    message = models.TextField()
    priority = models.CharField(max_length=10, choices=PRIORITY_CHOICES, default='normal')
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='draft')

    # 발송 대상
    target_user_ids = models.JSONField(default=list, blank=True)  # 특정 사용자
    target_all = models.BooleanField(default=False)  # 전체 사용자

    # 통계
    total_recipients = models.IntegerField(default=0)
    sent_count = models.IntegerField(default=0)
    success_count = models.IntegerField(default=0)
    failure_count = models.IntegerField(default=0)

    # 메타데이터
    data_payload = models.JSONField(default=dict, blank=True)  # 추가 데이터
    scheduled_at = models.DateTimeField(null=True, blank=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    created_at = models.DateTimeField(auto_now_add=True)
    created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)

class PushLog(models.Model):
    """푸시 발송 로그"""
    campaign = models.ForeignKey(PushCampaign, on_delete=models.CASCADE, related_name='logs')
    device_token = models.ForeignKey(DeviceToken, on_delete=models.CASCADE)

    STATUS_CHOICES = [
        ('pending', '대기 중'),
        ('sent', '발송됨'),
        ('failed', '실패'),
        ('invalid_token', '유효하지 않은 토큰'),
    ]

    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    error_message = models.TextField(blank=True)
    sent_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        indexes = [
            models.Index(fields=['campaign', 'status']),
        ]

Celery Task 구현

# notifications/tasks.py
from celery import shared_task, group, chord
from firebase_admin import messaging
from django.utils import timezone
from django.db.models import F
from .models import PushCampaign, DeviceToken, PushLog
import logging

logger = logging.getLogger(__name__)

# FCM API Rate Limit: 초당 600개 요청
BATCH_SIZE = 500  # 한 번에 처리할 디바이스 수
RATE_LIMIT = '600/m'  # 분당 600개 제한


@shared_task(bind=True, max_retries=3, rate_limit=RATE_LIMIT)
def send_push_batch(self, campaign_id, device_token_ids):
    """
    배치 단위로 푸시 알림 발송

    Args:
        campaign_id: 캠페인 ID
        device_token_ids: 디바이스 토큰 ID 리스트 (최대 500개)
    """
    try:
        campaign = PushCampaign.objects.get(id=campaign_id)
        device_tokens = DeviceToken.objects.filter(
            id__in=device_token_ids,
            is_active=True
        ).select_related('user')

        # FCM 메시지 구성
        messages = []
        token_map = {}  # 인덱스 -> device_token 매핑

        for idx, device in enumerate(device_tokens):
            token_map[idx] = device

            message = messaging.Message(
                notification=messaging.Notification(
                    title=campaign.title,
                    body=campaign.message,
                ),
                data=campaign.data_payload,
                token=device.token,
                android=messaging.AndroidConfig(
                    priority='high' if campaign.priority == 'high' else 'normal',
                ),
                apns=messaging.APNSConfig(
                    headers={
                        'apns-priority': '10' if campaign.priority == 'high' else '5',
                    },
                ),
            )
            messages.append(message)

        # FCM Batch 전송
        response = messaging.send_all(messages)

        # 결과 처리
        success_count = 0
        failure_count = 0
        invalid_tokens = []

        for idx, resp in enumerate(response.responses):
            device = token_map[idx]

            if resp.success:
                # 성공
                PushLog.objects.create(
                    campaign=campaign,
                    device_token=device,
                    status='sent',
                    sent_at=timezone.now()
                )
                success_count += 1
            else:
                # 실패
                error_code = resp.exception.code if resp.exception else 'unknown'

                # 유효하지 않은 토큰 처리
                if error_code in ['invalid-argument', 'registration-token-not-registered']:
                    invalid_tokens.append(device.id)
                    PushLog.objects.create(
                        campaign=campaign,
                        device_token=device,
                        status='invalid_token',
                        error_message=str(resp.exception)
                    )
                else:
                    PushLog.objects.create(
                        campaign=campaign,
                        device_token=device,
                        status='failed',
                        error_message=str(resp.exception)
                    )

                failure_count += 1

        # 유효하지 않은 토큰 비활성화
        if invalid_tokens:
            DeviceToken.objects.filter(id__in=invalid_tokens).update(is_active=False)

        # 캠페인 통계 업데이트
        campaign.sent_count = F('sent_count') + len(device_token_ids)
        campaign.success_count = F('success_count') + success_count
        campaign.failure_count = F('failure_count') + failure_count
        campaign.save(update_fields=['sent_count', 'success_count', 'failure_count'])

        logger.info(
            f'Batch sent for campaign {campaign_id}: '
            f'{success_count} success, {failure_count} failed'
        )

        return {
            'campaign_id': campaign_id,
            'total': len(device_token_ids),
            'success': success_count,
            'failure': failure_count,
        }

    except Exception as exc:
        logger.error(f'Error sending push batch: {exc}')
        # 재시도 (최대 3번)
        raise self.retry(exc=exc, countdown=60 * (self.request.retries + 1))


@shared_task
def aggregate_campaign_results(results, campaign_id):
    """
    모든 배치 작업 완료 후 최종 집계

    Args:
        results: 각 배치 작업의 결과 리스트
        campaign_id: 캠페인 ID
    """
    campaign = PushCampaign.objects.get(id=campaign_id)

    # 최종 상태 업데이트
    campaign.status = 'completed'
    campaign.completed_at = timezone.now()
    campaign.save(update_fields=['status', 'completed_at'])

    # 통계 로깅
    logger.info(
        f'Campaign {campaign_id} completed: '
        f'{campaign.success_count}/{campaign.total_recipients} sent successfully'
    )

    # 관리자에게 완료 알림 (선택사항)
    send_admin_notification.delay(campaign_id)

    return {
        'campaign_id': campaign_id,
        'status': 'completed',
        'total': campaign.total_recipients,
        'success': campaign.success_count,
        'failure': campaign.failure_count,
    }


@shared_task
def send_admin_notification(campaign_id):
    """관리자에게 캠페인 완료 알림"""
    campaign = PushCampaign.objects.get(id=campaign_id)

    # 이메일이나 슬랙 알림 발송
    # send_email(...) or send_slack_message(...)

    logger.info(f'Admin notification sent for campaign {campaign_id}')


@shared_task
def execute_push_campaign(campaign_id):
    """
    푸시 캠페인 실행 (메인 오케스트레이터)

    이 태스크는 대규모 푸시 발송을 다음과 같이 처리합니다:
    1. 대상 사용자의 디바이스 토큰 조회
    2. 배치 단위로 분할 (500개씩)
    3. 각 배치를 병렬로 처리
    4. 모든 배치 완료 후 결과 집계
    """
    try:
        campaign = PushCampaign.objects.get(id=campaign_id)

        # 상태 업데이트
        campaign.status = 'processing'
        campaign.started_at = timezone.now()
        campaign.save(update_fields=['status', 'started_at'])

        # 대상 디바이스 토큰 조회
        if campaign.target_all:
            # 전체 사용자
            device_tokens = DeviceToken.objects.filter(is_active=True)
        else:
            # 특정 사용자
            device_tokens = DeviceToken.objects.filter(
                user_id__in=campaign.target_user_ids,
                is_active=True
            )

        # 총 수신자 수 업데이트
        total_devices = device_tokens.count()
        campaign.total_recipients = total_devices
        campaign.save(update_fields=['total_recipients'])

        if total_devices == 0:
            campaign.status = 'completed'
            campaign.completed_at = timezone.now()
            campaign.save(update_fields=['status', 'completed_at'])
            return {'message': 'No recipients found'}

        # 디바이스 토큰 ID를 배치로 분할
        device_ids = list(device_tokens.values_list('id', flat=True))
        batches = [
            device_ids[i:i + BATCH_SIZE]
            for i in range(0, len(device_ids), BATCH_SIZE)
        ]

        logger.info(
            f'Starting campaign {campaign_id}: '
            f'{total_devices} devices in {len(batches)} batches'
        )

        # Chord 패턴: 모든 배치를 병렬로 처리하고 결과 집계
        callback = aggregate_campaign_results.s(campaign_id)
        header = group(
            send_push_batch.s(campaign_id, batch)
            for batch in batches
        )

        # 작업 실행
        chord(header)(callback)

        return {
            'campaign_id': campaign_id,
            'total_recipients': total_devices,
            'batches': len(batches),
            'status': 'processing'
        }

    except PushCampaign.DoesNotExist:
        logger.error(f'Campaign {campaign_id} not found')
        raise
    except Exception as exc:
        logger.error(f'Error executing campaign {campaign_id}: {exc}')
        campaign.status = 'failed'
        campaign.save(update_fields=['status'])
        raise

Django View & API 구현

# notifications/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAdminUser
from django.utils import timezone
from .models import PushCampaign
from .tasks import execute_push_campaign
from .serializers import PushCampaignSerializer


class PushCampaignViewSet(viewsets.ModelViewSet):
    """푸시 캠페인 관리 API"""
    queryset = PushCampaign.objects.all()
    serializer_class = PushCampaignSerializer
    permission_classes = [IsAdminUser]

    def perform_create(self, serializer):
        """캠페인 생성"""
        serializer.save(created_by=self.request.user)

    @action(detail=True, methods=['post'])
    def send(self, request, pk=None):
        """
        캠페인 발송 시작

        POST /api/campaigns/{id}/send/
        """
        campaign = self.get_object()

        # 상태 검증
        if campaign.status not in ['draft', 'scheduled']:
            return Response(
                {'error': 'Campaign already sent or in progress'},
                status=status.HTTP_400_BAD_REQUEST
            )

        # 우선순위에 따라 적절한 큐로 라우팅
        queue_name = {
            'high': 'high_priority',
            'normal': 'default',
            'low': 'low_priority',
        }.get(campaign.priority, 'default')

        # 비동기 작업 실행
        task = execute_push_campaign.apply_async(
            args=[campaign.id],
            queue=queue_name
        )

        return Response({
            'campaign_id': campaign.id,
            'task_id': task.id,
            'status': 'Campaign sending started',
            'priority': campaign.priority,
            'queue': queue_name,
        })

    @action(detail=True, methods=['get'])
    def stats(self, request, pk=None):
        """
        캠페인 통계 조회

        GET /api/campaigns/{id}/stats/
        """
        campaign = self.get_object()

        # 진행률 계산
        progress = 0
        if campaign.total_recipients > 0:
            progress = (campaign.sent_count / campaign.total_recipients) * 100

        return Response({
            'campaign_id': campaign.id,
            'status': campaign.status,
            'total_recipients': campaign.total_recipients,
            'sent': campaign.sent_count,
            'success': campaign.success_count,
            'failure': campaign.failure_count,
            'progress': round(progress, 2),
            'started_at': campaign.started_at,
            'completed_at': campaign.completed_at,
        })

    @action(detail=False, methods=['get'])
    def active(self, request):
        """
        진행 중인 캠페인 목록

        GET /api/campaigns/active/
        """
        active_campaigns = self.queryset.filter(
            status='processing'
        ).order_by('-started_at')

        serializer = self.get_serializer(active_campaigns, many=True)
        return Response(serializer.data)

Settings 설정

# settings.py

# Celery Task Routing (우선순위별 큐)
CELERY_TASK_ROUTES = {
    'notifications.tasks.send_push_batch': {
        'queue': 'default',
        'routing_key': 'push.batch',
    },
    'notifications.tasks.execute_push_campaign': {
        'queue': 'default',
        'routing_key': 'push.campaign',
    },
}

# Rate Limiting 설정
CELERY_TASK_ANNOTATIONS = {
    'notifications.tasks.send_push_batch': {
        'rate_limit': '600/m',  # FCM API 제한 준수
    },
}

# Redis를 사용한 결과 저장
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_RESULT_EXPIRES = 3600  # 1시간

# Worker 동시성 설정
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000  # 메모리 누수 방지

Worker 실행 전략

대규모 푸시 발송을 위한 워커 구성:

# 고우선순위 큐 워커 (긴급 공지용)
celery -A myproject worker \
    -Q high_priority \
    --concurrency=10 \
    -n worker-high@%h \
    -l info

# 일반 우선순위 큐 워커 (일반 알림용)
celery -A myproject worker \
    -Q default \
    --concurrency=20 \
    -n worker-normal@%h \
    -l info

# 저우선순위 큐 워커 (마케팅용)
celery -A myproject worker \
    -Q low_priority \
    --concurrency=5 \
    -n worker-low@%h \
    -l info

# Autoscale로 동적 조정 (피크 시간 대비)
celery -A myproject worker \
    -Q default \
    --autoscale=50,10 \
    -n worker-autoscale@%h \
    -l info

사용 예시

# 관리자 페이지나 스크립트에서 캠페인 생성 및 발송

# 1. 전체 사용자 대상 긴급 공지
campaign = PushCampaign.objects.create(
    title='긴급 시스템 점검 안내',
    message='오늘 오후 2시부터 30분간 시스템 점검이 진행됩니다.',
    priority='high',
    target_all=True,
    created_by=admin_user,
)
execute_push_campaign.delay(campaign.id)

# 2. 특정 사용자 대상 마케팅
vip_users = User.objects.filter(membership='VIP').values_list('id', flat=True)
campaign = PushCampaign.objects.create(
    title='VIP 회원 특별 할인',
    message='오늘만 50% 할인! 지금 바로 확인하세요.',
    priority='low',
    target_user_ids=list(vip_users),
    data_payload={'type': 'promotion', 'discount': 50},
    created_by=admin_user,
)
execute_push_campaign.delay(campaign.id)

# 3. API를 통한 발송 (REST API 호출)
# POST /api/campaigns/{id}/send/
# GET /api/campaigns/{id}/stats/

성능 최적화 팁

1. Database 최적화:

# Bulk Create로 로그 생성
logs = [
    PushLog(campaign=campaign, device_token=device, status='sent')
    for device in devices
]
PushLog.objects.bulk_create(logs, batch_size=1000)

# Select Related/Prefetch Related 활용
device_tokens = DeviceToken.objects.filter(
    user_id__in=user_ids
).select_related('user').only('id', 'token', 'platform')

2. 메모리 관리:

# Iterator를 사용하여 대량 쿼리 메모리 절약
for device in DeviceToken.objects.filter(is_active=True).iterator(chunk_size=1000):
    process_device(device)

3. 모니터링:

# Flower로 실시간 모니터링
# http://localhost:5555

# Custom metrics (Prometheus + Grafana)
from prometheus_client import Counter, Histogram

push_sent_counter = Counter('push_notifications_sent', 'Push notifications sent')
push_duration = Histogram('push_batch_duration', 'Push batch processing duration')

예상 처리량 계산

시나리오: 100만 명 사용자에게 푸시 발송

  • 배치 크기: 500개
  • 총 배치 수: 2,000개
  • 워커 수: 20개 (concurrency=20)
  • Rate Limit: 분당 600개 (초당 10개)
  • 예상 처리 시간:
    • 이론적: (1,000,000 / 600) = 약 1,667분 = 27.8시간
    • 실제 (20 워커 병렬): 약 1.5~2시간
    • 최적화 (50 워커 병렬): 약 30~40분

처리량 향상 방법:

  • 워커 수 증가 (horizontal scaling)
  • 여러 FCM 프로젝트 사용 (rate limit 분산)
  • 배치 크기 최적화
  • 네트워크 I/O 최적화

이 구현은 실제 프로덕션 환경에서 대규모 푸시 알림을 안정적으로 발송할 수 있는 견고한 시스템을 제공합니다.

AMQP와 메시지 브로커

RabbitMQ 설정

RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 구현한 메시지 브로커입니다.

설치 및 실행:

# macOS
brew install rabbitmq
brew services start rabbitmq

# Ubuntu
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server

Django 설정:

# settings.py
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'

# RabbitMQ 설정 상세
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

Redis vs RabbitMQ

특징 Redis RabbitMQ
프로토콜 Redis Protocol AMQP
메시지 영속성 제한적 강력함
성능 매우 빠름 빠름
복잡한 라우팅 제한적 강력함
메모리 사용 높음 낮음
사용 사례 간단한 작업 큐 복잡한 메시징 패턴

Celery Worker 실행 및 관리

Worker 실행

# 기본 실행
celery -A myproject worker -l info

# 동시성 설정 (워커 프로세스 수)
celery -A myproject worker -l info --concurrency=4

# 특정 큐만 처리
celery -A myproject worker -Q high_priority,default -l info

# Autoscale (최소-최대 워커 수)
celery -A myproject worker --autoscale=10,3 -l info

Celery Beat (주기적 작업)

# Beat 스케줄러 실행
celery -A myproject beat -l info

주기적 작업 설정:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-report': {
        'task': 'myapp.tasks.generate_daily_report',
        'schedule': crontab(hour=9, minute=0),  # 매일 오전 9시
    },
    'cleanup-old-data': {
        'task': 'myapp.tasks.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0, day_of_week=1),  # 매주 월요일 오전 2시
    },
    'process-pending-jobs': {
        'task': 'myapp.tasks.process_pending_jobs',
        'schedule': 300.0,  # 5분마다
    },
}

모니터링 및 관리

Flower를 이용한 모니터링

# Flower 설치
pip install flower

# Flower 실행
celery -A myproject flower
# http://localhost:5555 에서 접근

Flower 기능:

  • 실시간 작업 모니터링
  • 워커 상태 확인
  • 작업 통계 및 그래프
  • 작업 재시도/취소

작업 상태 확인

from celery.result import AsyncResult

def check_task_status(task_id):
    result = AsyncResult(task_id)

    return {
        'task_id': task_id,
        'status': result.status,
        'result': result.result if result.ready() else None,
        'traceback': result.traceback if result.failed() else None,
    }

프로덕션 배포 고려사항

1. Supervisor를 이용한 프로세스 관리

; /etc/supervisor/conf.d/celery.conf
[program:celery-worker]
command=/path/to/venv/bin/celery -A myproject worker -l info
directory=/path/to/project
user=www-data
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker_error.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600

[program:celery-beat]
command=/path/to/venv/bin/celery -A myproject beat -l info
directory=/path/to/project
user=www-data
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat_error.log
autostart=true
autorestart=true
startsecs=10

2. 로깅 설정

# settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'celery': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/celery/celery.log',
            'maxBytes': 1024 * 1024 * 10,  # 10MB
            'backupCount': 5,
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery'],
            'level': 'INFO',
        },
    },
}

3. 보안 설정

# settings.py

# 작업 직렬화 제한 (pickle 사용 금지)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 브로커 연결 보안
CELERY_BROKER_URL = 'amqps://user:password@hostname:5671//'
CELERY_BROKER_USE_SSL = {
    'keyfile': '/path/to/key.pem',
    'certfile': '/path/to/cert.pem',
    'ca_certs': '/path/to/ca.pem',
}

Key Points

  • Celery는 Django와 함께 사용하여 시간이 오래 걸리는 작업을 비동기로 처리할 수 있는 강력한 도구입니다
  • Message Broker (Redis, RabbitMQ)를 통해 작업을 큐에 저장하고 워커가 처리합니다
  • Chunking, Chain, Chord 등의 패턴을 활용하여 대규모 작업을 효율적으로 분산 처리할 수 있습니다
  • 대규모 푸시 알림 시스템과 같은 실전 예제에서 100만 명 이상의 사용자에게 효율적으로 메시지를 발송할 수 있습니다
  • 우선순위 큐와 Rate Limiting을 활용하여 외부 API 제약사항을 준수하면서도 최적의 성능을 달성할 수 있습니다
  • Celery Beat을 사용하여 주기적인 작업을 스케줄링할 수 있습니다
  • Flower를 통해 작업 실행 상태를 실시간으로 모니터링할 수 있습니다
  • 프로덕션 환경에서는 Supervisor, 로깅, 보안 설정을 적절히 구성해야 합니다
  • Database 최적화 (bulk_create, select_related)와 메모리 관리가 대규모 처리의 핵심입니다

Conclusion

Django와 Celery를 활용하면 웹 애플리케이션의 성능과 사용자 경험을 크게 향상시킬 수 있습니다. 시간이 오래 걸리는 작업을 백그라운드로 분리함으로써 빠른 응답 시간을 유지하고, 여러 워커를 통한 분산 처리로 대규모 작업도 효율적으로 처리할 수 있습니다.

Celery를 도입할 때는 작업의 특성에 맞는 메시지 브로커를 선택하고, 적절한 작업 분산 패턴을 적용하며, 모니터링 체계를 구축하는 것이 중요합니다.

Next Steps

  • [Django REST Framework와 비동기 API 구현]
  • [RabbitMQ 고급 기능과 메시지 패턴]
  • [Celery 성능 최적화 및 튜닝]
  • [분산 시스템 아키텍처 설계]