Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on background tasks with Celery! ๐ In this guide, weโll explore how to handle time-consuming operations without making your users wait.
Ever wondered how Netflix processes your video uploads ๐ฌ, or how your favorite e-commerce site sends order confirmation emails instantly? The secret is background tasks! Youโll discover how Celery can transform your Flask and Django applications into powerhouses that handle heavy work behind the scenes.
By the end of this tutorial, youโll feel confident implementing background tasks in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Background Tasks with Celery
๐ค What are Background Tasks?
Background tasks are like having a personal assistant ๐ค. Think of it as delegating time-consuming work to someone else while you continue serving your customers. Your web app stays responsive while the heavy lifting happens behind the scenes!
In Python terms, background tasks allow you to offload slow operations to separate worker processes. This means you can:
- โจ Keep your web app lightning fast
- ๐ Process multiple tasks simultaneously
- ๐ก๏ธ Handle failures gracefully with retries
๐ก Why Use Celery?
Hereโs why developers love Celery:
- Battle-tested ๐: Used by Instagram, Mozilla, and more
- Flexible ๐ป: Works with Flask, Django, and plain Python
- Reliable ๐: Built-in retries and error handling
- Scalable ๐ง: Add more workers as you grow
Real-world example: Imagine building an online store ๐. With Celery, you can process orders instantly while emails, inventory updates, and analytics happen in the background!
๐ง Basic Syntax and Usage
๐ Setting Up Celery
Letโs start with a friendly example:
# ๐ Hello, Celery!
from celery import Celery
# ๐จ Create our Celery app
app = Celery('tasks', broker='redis://localhost:6379')
# ๐ฏ Define a simple task
@app.task
def send_welcome_email(user_email):
# ๐ง Simulate sending email
print(f"Sending welcome email to {user_email}! ๐")
return f"Email sent to {user_email}"
๐ก Explanation: The @app.task
decorator turns any function into a background task. Redis acts as our message broker - think of it as a todo list for our workers!
๐ฏ Common Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Flask Integration
from flask import Flask, jsonify
from celery import Celery
flask_app = Flask(__name__)
celery_app = Celery(flask_app.name, broker='redis://localhost:6379')
# ๐จ Pattern 2: Task with retry logic
@celery_app.task(bind=True, max_retries=3)
def process_image(self, image_path):
try:
# ๐ผ๏ธ Process the image
result = resize_image(image_path)
return result
except Exception as exc:
# ๐ Retry if something goes wrong
raise self.retry(exc=exc, countdown=60)
# ๐ Pattern 3: Scheduled tasks
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=9, minute=0), # ๐
Every day at 9 AM
},
}
๐ก Practical Examples
๐ Example 1: E-commerce Order Processing
Letโs build something real:
# ๐๏ธ Order processing system
from celery import Celery, group
from datetime import datetime
app = Celery('shop', broker='redis://localhost:6379')
# ๐ฆ Order processing task
@app.task
def process_order(order_id, customer_email, items):
print(f"๐ Processing order #{order_id}")
# ๐ฐ Calculate total
total = sum(item['price'] * item['quantity'] for item in items)
# ๐ฏ Chain multiple tasks
workflow = group(
update_inventory.s(items),
send_order_email.s(customer_email, order_id),
notify_warehouse.s(order_id),
update_analytics.s(order_id, total)
)
workflow.apply_async()
return f"Order #{order_id} processed! Total: ${total}"
# ๐ Inventory update
@app.task
def update_inventory(items):
for item in items:
print(f"๐ Reducing stock for {item['name']} by {item['quantity']}")
return "Inventory updated! โ
"
# ๐ง Email notification
@app.task
def send_order_email(customer_email, order_id):
print(f"๐ง Sending confirmation to {customer_email}")
# Simulate email sending
return f"Email sent for order #{order_id}! ๐"
# ๐ญ Warehouse notification
@app.task
def notify_warehouse(order_id):
print(f"๐ Notifying warehouse about order #{order_id}")
return "Warehouse notified! ๐ฆ"
# ๐ Analytics update
@app.task
def update_analytics(order_id, total):
print(f"๐ Recording sale: Order #{order_id}, Amount: ${total}")
return "Analytics updated! ๐"
# ๐ฎ Let's use it!
if __name__ == '__main__':
items = [
{'name': 'Python Book', 'price': 29.99, 'quantity': 2, 'emoji': '๐'},
{'name': 'Coffee Mug', 'price': 12.99, 'quantity': 1, 'emoji': 'โ'}
]
result = process_order.delay(
order_id='ORD-001',
customer_email='[email protected]',
items=items
)
print(f"๐ Task ID: {result.id}")
๐ฏ Try it yourself: Add a discount calculation task and priority shipping notification!
๐ฎ Example 2: Django Social Media Post Processor
Letโs make it fun with Django:
# ๐ Social media post processor
from celery import shared_task
from django.core.cache import cache
from PIL import Image
import requests
# ๐ธ Image processing task
@shared_task(bind=True)
def process_social_post(self, post_id, image_url, caption):
print(f"๐จ Processing post #{post_id}")
try:
# ๐ฅ Download image
response = requests.get(image_url)
img = Image.open(BytesIO(response.content))
# ๐ฏ Create multiple versions
versions = {
'thumbnail': (150, 150),
'mobile': (480, 480),
'desktop': (1080, 1080)
}
processed_urls = {}
for version_name, size in versions.items():
# ๐ผ๏ธ Resize image
resized = img.resize(size, Image.LANCZOS)
url = save_to_storage(resized, f"{post_id}_{version_name}")
processed_urls[version_name] = url
print(f"โจ Created {version_name} version!")
# ๐ท๏ธ Extract hashtags
hashtags = extract_hashtags.delay(caption)
# ๐ Analyze sentiment
sentiment = analyze_sentiment.delay(caption)
# ๐พ Cache results
cache.set(f"post_{post_id}_urls", processed_urls, 3600)
return {
'status': 'success',
'urls': processed_urls,
'message': f'Post #{post_id} processed! ๐'
}
except Exception as exc:
# ๐ Retry on failure
print(f"โ ๏ธ Error processing post: {exc}")
raise self.retry(exc=exc, countdown=30, max_retries=3)
# ๐ท๏ธ Hashtag extraction
@shared_task
def extract_hashtags(caption):
import re
hashtags = re.findall(r'#\w+', caption)
print(f"๐ท๏ธ Found hashtags: {hashtags}")
return hashtags
# ๐ Sentiment analysis
@shared_task
def analyze_sentiment(caption):
# Simulate sentiment analysis
if any(word in caption.lower() for word in ['love', 'amazing', 'great']):
sentiment = 'positive ๐'
elif any(word in caption.lower() for word in ['hate', 'terrible', 'bad']):
sentiment = 'negative ๐ข'
else:
sentiment = 'neutral ๐'
print(f"๐ญ Sentiment: {sentiment}")
return sentiment
# ๐ฏ Django view example
from django.http import JsonResponse
from django.views import View
class CreatePostView(View):
def post(self, request):
# ๐ Get post data
image_url = request.POST.get('image_url')
caption = request.POST.get('caption')
# ๐ Queue for processing
task = process_social_post.delay(
post_id=generate_post_id(),
image_url=image_url,
caption=caption
)
return JsonResponse({
'message': 'Post queued for processing! ๐',
'task_id': task.id,
'status_url': f'/api/task-status/{task.id}/'
})
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Task Workflows
When youโre ready to level up, try task chains and groups:
# ๐ฏ Advanced workflow patterns
from celery import chain, group, chord
# ๐ Chain tasks (run sequentially)
workflow = chain(
fetch_data.s(user_id),
process_data.s(),
generate_report.s(),
send_report.s()
)
# ๐จ Group tasks (run in parallel)
parallel_tasks = group(
resize_image.s(img_id, 'small'),
resize_image.s(img_id, 'medium'),
resize_image.s(img_id, 'large')
)
# ๐ต Chord (parallel then callback)
@app.task
def aggregate_results(results):
return f"Processed {len(results)} items! โจ"
chord_workflow = chord(
(process_item.s(item) for item in items),
aggregate_results.s()
)
๐๏ธ Advanced Topic 2: Monitoring and Rate Limiting
For production-ready apps:
# ๐ Rate limiting tasks
@app.task(rate_limit='10/m') # 10 per minute
def send_notification(user_id, message):
print(f"๐ฑ Sending notification to user {user_id}")
return "Notification sent! ๐ฌ"
# ๐ Task monitoring
@app.task(bind=True)
def monitored_task(self, data):
# ๐ Track progress
total = len(data)
for i, item in enumerate(data):
process_item(item)
# ๐ฏ Update task progress
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total}
)
return f"Processed {total} items! ๐"
# ๐ก๏ธ Error handling with custom states
@app.task(bind=True)
def robust_task(self, risky_data):
try:
result = risky_operation(risky_data)
return result
except ValidationError as exc:
# โ ๏ธ Custom error state
self.update_state(
state='VALIDATION_ERROR',
meta={'error': str(exc)}
)
raise
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Leaks with Large Data
# โ Wrong way - passing large objects!
@app.task
def process_huge_data(massive_dataframe):
# ๐ฅ This will consume lots of memory!
return massive_dataframe.mean()
# โ
Correct way - pass references!
@app.task
def process_huge_data(file_path):
# ๐ Load data inside the task
df = pd.read_csv(file_path)
result = df.mean()
return result
๐คฏ Pitfall 2: Task Result Backend Overflow
# โ Dangerous - storing large results!
@app.task
def generate_report():
huge_report = create_massive_report() # ๐ฅ 100MB report!
return huge_report
# โ
Safe - store reference instead!
@app.task
def generate_report():
report_path = create_and_save_report() # ๐พ Save to storage
return {
'status': 'complete',
'report_url': f'/reports/{report_path}',
'size_mb': get_file_size(report_path)
}
๐ ๏ธ Best Practices
- ๐ฏ Keep Tasks Atomic: Each task should do one thing well
- ๐ Use Task Names: Give tasks descriptive names for monitoring
- ๐ก๏ธ Set Time Limits: Prevent tasks from running forever
- ๐จ Use Result Backends Wisely: Donโt store large results
- โจ Handle Failures Gracefully: Always plan for retries
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Video Processing Pipeline
Create a background task system for video processing:
๐ Requirements:
- โ Accept video upload and queue for processing
- ๐ท๏ธ Extract video metadata (duration, resolution)
- ๐ค Generate multiple quality versions (480p, 720p, 1080p)
- ๐ Create thumbnail at specific timestamps
- ๐จ Apply watermark to all versions!
๐ Bonus Points:
- Add progress tracking for long videos
- Implement priority queue for premium users
- Send email notification when complete
๐ก Solution
๐ Click to see solution
# ๐ฏ Video processing pipeline!
from celery import Celery, chain, group
from moviepy.editor import VideoFileClip
import os
app = Celery('video_processor', broker='redis://localhost:6379')
# ๐น Main video processing task
@app.task(bind=True)
def process_video(self, video_id, video_path, user_email):
print(f"๐ฌ Processing video {video_id}")
# ๐ Extract metadata
metadata = extract_metadata.delay(video_path)
# ๐ฏ Create processing workflow
workflow = chain(
validate_video.s(video_path),
group(
create_quality_version.s(video_path, '480p'),
create_quality_version.s(video_path, '720p'),
create_quality_version.s(video_path, '1080p'),
),
generate_thumbnails.s(video_path),
apply_watermark.s(video_id),
notify_completion.s(user_email, video_id)
)
return workflow.apply_async()
# ๐ Metadata extraction
@app.task
def extract_metadata(video_path):
clip = VideoFileClip(video_path)
metadata = {
'duration': clip.duration,
'fps': clip.fps,
'resolution': f"{clip.w}x{clip.h}",
'size_mb': os.path.getsize(video_path) / (1024 * 1024)
}
print(f"๐ Metadata: {metadata}")
return metadata
# โ
Video validation
@app.task
def validate_video(video_path):
try:
clip = VideoFileClip(video_path)
if clip.duration > 3600: # 1 hour limit
raise ValueError("Video too long! Max 1 hour ๐")
return True
except Exception as e:
print(f"โ Validation failed: {e}")
raise
# ๐ฌ Quality version creation
@app.task(bind=True)
def create_quality_version(self, video_path, quality):
resolutions = {
'480p': (854, 480),
'720p': (1280, 720),
'1080p': (1920, 1080)
}
try:
clip = VideoFileClip(video_path)
width, height = resolutions[quality]
# ๐ Resize video
resized = clip.resize(height=height)
output_path = f"{video_path}_{quality}.mp4"
# ๐พ Save with progress tracking
resized.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
progress_bar=False,
logger=None
)
print(f"โจ Created {quality} version!")
return output_path
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# ๐ผ๏ธ Thumbnail generation
@app.task
def generate_thumbnails(video_path):
clip = VideoFileClip(video_path)
timestamps = [0, clip.duration/4, clip.duration/2, clip.duration*3/4]
thumbnails = []
for i, timestamp in enumerate(timestamps):
thumbnail_path = f"{video_path}_thumb_{i}.jpg"
clip.save_frame(thumbnail_path, t=timestamp)
thumbnails.append(thumbnail_path)
print(f"๐ธ Generated thumbnail {i+1}/4")
return thumbnails
# ๐ง Watermark application
@app.task
def apply_watermark(video_versions, video_id):
# Simulate watermarking
print(f"๐ง Applying watermark to all versions")
return "Watermark applied! ๐จ"
# ๐ง Completion notification
@app.task
def notify_completion(watermark_result, user_email, video_id):
print(f"๐ง Sending completion email to {user_email}")
return f"Video {video_id} processing complete! ๐"
# ๐ฎ Usage example
if __name__ == '__main__':
task = process_video.delay(
video_id='VID-001',
video_path='/uploads/awesome_video.mp4',
user_email='[email protected]'
)
print(f"๐ Processing started! Task ID: {task.id}")
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Create background tasks with Celery confidence ๐ช
- โ Integrate with Flask/Django seamlessly ๐ก๏ธ
- โ Build task workflows for complex operations ๐ฏ
- โ Handle failures gracefully with retries ๐
- โ Scale your applications with worker processes! ๐
Remember: Background tasks are your secret weapon for building responsive, scalable applications! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered background tasks with Celery!
Hereโs what to do next:
- ๐ป Practice with the video processing exercise
- ๐๏ธ Add Celery to your existing Flask/Django project
- ๐ Explore Celery Beat for scheduled tasks
- ๐ Learn about task monitoring with Flower!
Remember: Every scalable app started with its first background task. Keep coding, keep learning, and most importantly, have fun! ๐
Happy coding! ๐๐โจ