+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 310 of 365

๐Ÿ“˜ FP Project: Data Processing Pipeline

Master fp project: data processing pipeline in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on building a functional programming data processing pipeline! ๐ŸŽ‰ In this guide, weโ€™ll explore how to create a powerful, composable data pipeline using functional programming principles in Python.

Youโ€™ll discover how functional programming can transform your data processing workflows. Whether youโ€™re analyzing datasets ๐Ÿ“Š, transforming API responses ๐ŸŒ, or building ETL pipelines ๐Ÿ”„, understanding FP pipelines is essential for writing clean, maintainable, and scalable code.

By the end of this tutorial, youโ€™ll have built a complete data processing pipeline that you can adapt for your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding FP Data Pipelines

๐Ÿค” What is a Functional Data Pipeline?

A functional data pipeline is like a factory assembly line ๐Ÿญ. Think of it as a series of stations where each station performs one specific transformation on your data, passing the result to the next station.

In Python terms, itโ€™s a chain of pure functions that transform data step by step. This means you can:

  • โœจ Compose simple functions into complex workflows
  • ๐Ÿš€ Process data without side effects
  • ๐Ÿ›ก๏ธ Test each transformation independently

๐Ÿ’ก Why Use Functional Pipelines?

Hereโ€™s why developers love FP pipelines:

  1. Composability ๐Ÿ”—: Build complex operations from simple functions
  2. Testability ๐Ÿงช: Each function is isolated and easy to test
  3. Readability ๐Ÿ“–: The flow of data is clear and explicit
  4. Reusability โ™ป๏ธ: Functions can be reused in different pipelines

Real-world example: Imagine processing user activity logs ๐Ÿ“. With FP pipelines, you can filter, transform, aggregate, and export data in a clean, modular way.

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Building Blocks

Letโ€™s start with the fundamental components:

# ๐Ÿ‘‹ Hello, Functional Pipeline!
from functools import reduce
from typing import Callable, List, Any, TypeVar, Dict
import json

# ๐ŸŽจ Type definitions for clarity
T = TypeVar('T')
Transformer = Callable[[T], T]

# ๐Ÿ”ง Core pipeline function
def pipeline(*functions: Transformer) -> Transformer:
    """
    Create a pipeline from multiple functions
    Each function transforms the output of the previous one
    """
    def pipe(data: Any) -> Any:
        return reduce(lambda result, func: func(result), functions, data)
    return pipe

# ๐Ÿš€ Simple transformation functions
def clean_text(text: str) -> str:
    """Remove extra whitespace ๐Ÿงน"""
    return ' '.join(text.split())

def to_uppercase(text: str) -> str:
    """Convert to uppercase ๐Ÿ”ค"""
    return text.upper()

def add_emoji(text: str) -> str:
    """Add excitement! ๐ŸŽ‰"""
    return f"{text} ๐Ÿš€"

# ๐ŸŽฎ Let's use it!
text_pipeline = pipeline(
    clean_text,
    to_uppercase,
    add_emoji
)

result = text_pipeline("  hello   world  ")
print(result)  # HELLO WORLD ๐Ÿš€

๐Ÿ’ก Explanation: Notice how each function does one thing well! The pipeline combines them into a powerful data transformer.

๐ŸŽฏ Common Patterns

Here are patterns youโ€™ll use in real pipelines:

# ๐Ÿ—๏ธ Pattern 1: Data filtering and mapping
def filter_by(predicate: Callable) -> Callable:
    """Create a filter function ๐Ÿ”"""
    def filter_func(items: List) -> List:
        return [item for item in items if predicate(item)]
    return filter_func

def map_over(transformer: Callable) -> Callable:
    """Create a map function ๐Ÿ—บ๏ธ"""
    def map_func(items: List) -> List:
        return [transformer(item) for item in items]
    return map_func

# ๐ŸŽจ Pattern 2: Aggregation functions
def sum_by(key_func: Callable) -> Callable:
    """Sum values by a key function ๐Ÿ“Š"""
    def sum_func(items: List) -> float:
        return sum(key_func(item) for item in items)
    return sum_func

def group_by(key_func: Callable) -> Callable:
    """Group items by a key function ๐Ÿ“"""
    def group_func(items: List) -> Dict:
        result = {}
        for item in items:
            key = key_func(item)
            if key not in result:
                result[key] = []
            result[key].append(item)
        return result
    return group_func

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-commerce Order Processing

Letโ€™s build a real order processing pipeline:

# ๐Ÿ›๏ธ Order processing pipeline
from datetime import datetime
from typing import List, Dict

# ๐Ÿ“ฆ Define our order structure
class Order:
    def __init__(self, order_id: str, customer: str, 
                 items: List[Dict], date: str, status: str):
        self.order_id = order_id
        self.customer = customer
        self.items = items  # [{"name": "...", "price": ..., "quantity": ...}]
        self.date = datetime.fromisoformat(date)
        self.status = status
        self.total = 0.0
    
    def to_dict(self) -> Dict:
        return {
            "order_id": self.order_id,
            "customer": self.customer,
            "items": self.items,
            "date": self.date.isoformat(),
            "status": self.status,
            "total": self.total,
            "emoji": "๐Ÿ“ฆ" if self.status == "shipped" else "๐Ÿ›’"
        }

# ๐Ÿ”ง Pipeline functions
def calculate_totals(orders: List[Order]) -> List[Order]:
    """Calculate order totals ๐Ÿ’ฐ"""
    for order in orders:
        order.total = sum(
            item["price"] * item["quantity"] 
            for item in order.items
        )
    return orders

def filter_active_orders(orders: List[Order]) -> List[Order]:
    """Keep only active orders ๐ŸŸข"""
    active_statuses = {"pending", "processing", "shipped"}
    return [o for o in orders if o.status in active_statuses]

def apply_discounts(min_total: float, discount: float) -> Callable:
    """Apply discount to large orders ๐ŸŽ"""
    def discount_func(orders: List[Order]) -> List[Order]:
        for order in orders:
            if order.total >= min_total:
                order.total *= (1 - discount)
                print(f"๐ŸŽ‰ Discount applied to order {order.order_id}!")
        return orders
    return discount_func

def sort_by_date(orders: List[Order]) -> List[Order]:
    """Sort orders by date ๐Ÿ“…"""
    return sorted(orders, key=lambda o: o.date, reverse=True)

def add_priority_flag(orders: List[Order]) -> List[Order]:
    """Flag high-value orders ๐ŸŒŸ"""
    for order in orders:
        if order.total > 100:
            order.priority = "HIGH"
            print(f"โญ High priority order: {order.order_id}")
        else:
            order.priority = "NORMAL"
    return orders

# ๐Ÿš€ Create the pipeline
order_pipeline = pipeline(
    calculate_totals,
    filter_active_orders,
    apply_discounts(50.0, 0.1),  # 10% off orders over $50
    sort_by_date,
    add_priority_flag
)

# ๐ŸŽฎ Test data
test_orders = [
    Order("001", "Alice", [
        {"name": "Python Book", "price": 29.99, "quantity": 2},
        {"name": "Coffee", "price": 4.99, "quantity": 3}
    ], "2024-01-15", "shipped"),
    
    Order("002", "Bob", [
        {"name": "Laptop", "price": 999.99, "quantity": 1}
    ], "2024-01-16", "processing"),
    
    Order("003", "Charlie", [
        {"name": "Mouse", "price": 19.99, "quantity": 1}
    ], "2024-01-14", "cancelled")
]

# ๐ŸŽฏ Process orders
processed_orders = order_pipeline(test_orders)
for order in processed_orders:
    print(f"{order.to_dict()['emoji']} Order {order.order_id}: ${order.total:.2f}")

๐ŸŽฏ Try it yourself: Add a function to generate shipping labels for high-priority orders!

๐ŸŽฎ Example 2: Real-time Analytics Pipeline

Letโ€™s create a pipeline for processing streaming data:

# ๐Ÿ† Analytics pipeline for user events
import statistics
from collections import defaultdict
from typing import List, Dict, Any

# ๐Ÿ“Š Event structure
class UserEvent:
    def __init__(self, user_id: str, event_type: str, 
                 timestamp: str, data: Dict[str, Any]):
        self.user_id = user_id
        self.event_type = event_type
        self.timestamp = datetime.fromisoformat(timestamp)
        self.data = data

# ๐Ÿ”ง Analytics functions
def enrich_with_session(events: List[UserEvent]) -> List[UserEvent]:
    """Add session information ๐Ÿ”—"""
    sessions = {}
    session_timeout = 1800  # 30 minutes
    
    for event in sorted(events, key=lambda e: e.timestamp):
        if event.user_id not in sessions:
            sessions[event.user_id] = {"id": f"session_{len(sessions)}", 
                                       "last_seen": event.timestamp}
        
        last_seen = sessions[event.user_id]["last_seen"]
        if (event.timestamp - last_seen).seconds > session_timeout:
            sessions[event.user_id] = {"id": f"session_{len(sessions)}", 
                                       "last_seen": event.timestamp}
        
        event.session_id = sessions[event.user_id]["id"]
        sessions[event.user_id]["last_seen"] = event.timestamp
    
    return events

def calculate_metrics(events: List[UserEvent]) -> Dict[str, Any]:
    """Calculate key metrics ๐Ÿ“ˆ"""
    metrics = {
        "total_events": len(events),
        "unique_users": len(set(e.user_id for e in events)),
        "events_by_type": defaultdict(int),
        "avg_events_per_user": 0,
        "peak_hour": None,
        "emoji": "๐Ÿ“Š"
    }
    
    # Count events by type
    for event in events:
        metrics["events_by_type"][event.event_type] += 1
    
    # Calculate average events per user
    user_counts = defaultdict(int)
    for event in events:
        user_counts[event.user_id] += 1
    
    if user_counts:
        metrics["avg_events_per_user"] = statistics.mean(user_counts.values())
    
    # Find peak hour
    hour_counts = defaultdict(int)
    for event in events:
        hour = event.timestamp.hour
        hour_counts[hour] += 1
    
    if hour_counts:
        peak_hour = max(hour_counts.items(), key=lambda x: x[1])
        metrics["peak_hour"] = f"{peak_hour[0]}:00 ({peak_hour[1]} events)"
    
    return metrics

def detect_anomalies(threshold: float = 3.0) -> Callable:
    """Detect unusual activity patterns ๐Ÿšจ"""
    def anomaly_detector(events: List[UserEvent]) -> List[Dict]:
        user_counts = defaultdict(int)
        for event in events:
            user_counts[event.user_id] += 1
        
        if not user_counts:
            return []
        
        mean = statistics.mean(user_counts.values())
        stdev = statistics.stdev(user_counts.values()) if len(user_counts) > 1 else 0
        
        anomalies = []
        for user_id, count in user_counts.items():
            if stdev > 0 and abs(count - mean) > threshold * stdev:
                anomalies.append({
                    "user_id": user_id,
                    "event_count": count,
                    "severity": "HIGH" if count > mean else "LOW",
                    "emoji": "๐Ÿšจ" if count > mean else "โš ๏ธ"
                })
        
        return anomalies
    return anomaly_detector

# ๐Ÿš€ Create analytics pipeline
def create_analytics_pipeline():
    """Build the complete analytics pipeline ๐Ÿ—๏ธ"""
    
    def analyze(events: List[UserEvent]) -> Dict[str, Any]:
        # Enrich events
        enriched = enrich_with_session(events)
        
        # Calculate metrics
        metrics = calculate_metrics(enriched)
        
        # Detect anomalies
        anomalies = detect_anomalies(2.5)(enriched)
        
        return {
            "metrics": metrics,
            "anomalies": anomalies,
            "processed_at": datetime.now().isoformat(),
            "pipeline_status": "โœ… Success"
        }
    
    return analyze

# ๐ŸŽฎ Test the pipeline
test_events = [
    UserEvent("user1", "page_view", "2024-01-15T10:00:00", {"page": "/home"}),
    UserEvent("user1", "click", "2024-01-15T10:05:00", {"button": "login"}),
    UserEvent("user2", "page_view", "2024-01-15T10:10:00", {"page": "/products"}),
    # Add many more events...
    UserEvent("user1", "purchase", "2024-01-15T11:00:00", {"amount": 99.99}),
]

analytics = create_analytics_pipeline()
results = analytics(test_events)
print(f"{results['pipeline_status']} Processed {results['metrics']['total_events']} events")

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Async Pipelines

When youโ€™re ready to level up, try async pipelines:

# ๐ŸŽฏ Async pipeline for I/O operations
import asyncio
from typing import AsyncGenerator, Callable, Any

async def async_pipeline(*functions: Callable) -> Callable:
    """Create an async pipeline โšก"""
    async def pipe(data: Any) -> Any:
        result = data
        for func in functions:
            if asyncio.iscoroutinefunction(func):
                result = await func(result)
            else:
                result = func(result)
        return result
    return pipe

# ๐ŸŒ Async data fetcher
async def fetch_data(url: str) -> Dict:
    """Simulate async data fetching ๐Ÿ“ก"""
    await asyncio.sleep(0.1)  # Simulate network delay
    return {"url": url, "data": "async data", "emoji": "๐Ÿš€"}

# ๐Ÿ”„ Transform async data
async def transform_async(data: Dict) -> Dict:
    """Async transformation โœจ"""
    await asyncio.sleep(0.05)
    data["transformed"] = True
    return data

# ๐ŸŽฎ Use async pipeline
async def main():
    async_pipe = await async_pipeline(
        fetch_data,
        transform_async,
        lambda d: {**d, "final": True}
    )
    
    result = await async_pipe("https://api.example.com")
    print(f"๐ŸŽ‰ Async result: {result}")

๐Ÿ—๏ธ Advanced Topic 2: Pipeline Composition

For complex workflows, compose pipelines:

# ๐Ÿš€ Composable pipeline builder
class PipelineBuilder:
    def __init__(self):
        self.steps = []
        self.error_handlers = {}
    
    def add_step(self, func: Callable, name: str = None):
        """Add a transformation step ๐Ÿ”ง"""
        self.steps.append((name or func.__name__, func))
        return self
    
    def add_error_handler(self, error_type: type, handler: Callable):
        """Add error handling ๐Ÿ›ก๏ธ"""
        self.error_handlers[error_type] = handler
        return self
    
    def add_logging(self):
        """Add automatic logging ๐Ÿ“"""
        def log_wrapper(func):
            def wrapped(*args, **kwargs):
                print(f"๐Ÿ” Executing: {func.__name__}")
                result = func(*args, **kwargs)
                print(f"โœ… Completed: {func.__name__}")
                return result
            return wrapped
        
        self.steps = [(name, log_wrapper(func)) for name, func in self.steps]
        return self
    
    def build(self) -> Callable:
        """Build the final pipeline ๐Ÿ—๏ธ"""
        def execute(data: Any) -> Any:
            result = data
            for name, func in self.steps:
                try:
                    result = func(result)
                except Exception as e:
                    error_type = type(e)
                    if error_type in self.error_handlers:
                        print(f"โš ๏ธ Handling error in {name}: {e}")
                        result = self.error_handlers[error_type](result, e)
                    else:
                        raise
            return result
        return execute

# ๐ŸŽจ Use the builder
pipeline = (PipelineBuilder()
    .add_step(clean_data, "clean")
    .add_step(validate_data, "validate")
    .add_step(transform_data, "transform")
    .add_error_handler(ValueError, lambda data, e: {"error": str(e), "data": data})
    .add_logging()
    .build())

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Mutating Data

# โŒ Wrong way - mutating input data!
def bad_transform(data: List[Dict]) -> List[Dict]:
    for item in data:
        item["processed"] = True  # ๐Ÿ’ฅ Mutates original!
    return data

# โœ… Correct way - create new data!
def good_transform(data: List[Dict]) -> List[Dict]:
    return [
        {**item, "processed": True}  # โœจ Creates new dict
        for item in data
    ]

๐Ÿคฏ Pitfall 2: Side Effects in Pipeline

# โŒ Dangerous - side effects in pipeline!
processed_count = 0  # ๐Ÿ˜ฐ Global state

def bad_counter(items: List) -> List:
    global processed_count
    processed_count += len(items)  # ๐Ÿ’ฅ Side effect!
    return items

# โœ… Safe - return all needed data!
def good_counter(items: List) -> Dict:
    return {
        "items": items,
        "count": len(items),
        "timestamp": datetime.now().isoformat()
    }

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Keep Functions Pure: No side effects, same input = same output
  2. ๐Ÿ“ Type Everything: Use type hints for clarity
  3. ๐Ÿ›ก๏ธ Handle Errors Gracefully: Plan for failures
  4. ๐ŸŽจ Compose Small Functions: Each does one thing well
  5. โœจ Test Individual Steps: Ensure each function works correctly

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Log Analysis Pipeline

Create a pipeline to analyze application logs:

๐Ÿ“‹ Requirements:

  • โœ… Parse log entries from different formats
  • ๐Ÿท๏ธ Extract timestamps, levels, and messages
  • ๐Ÿ‘ค Group by error severity
  • ๐Ÿ“… Calculate error rates over time
  • ๐ŸŽจ Generate a summary report with emojis!

๐Ÿš€ Bonus Points:

  • Add real-time alerting for critical errors
  • Implement log pattern detection
  • Create visualization data for charts

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Complete log analysis pipeline!
import re
from datetime import datetime, timedelta
from collections import defaultdict
from typing import List, Dict, Tuple

# ๐Ÿ“ Log entry structure
class LogEntry:
    def __init__(self, timestamp: str, level: str, message: str):
        self.timestamp = datetime.fromisoformat(timestamp)
        self.level = level.upper()
        self.message = message
        self.emoji = self._get_emoji()
    
    def _get_emoji(self) -> str:
        emoji_map = {
            "ERROR": "๐Ÿ”ด",
            "WARNING": "๐ŸŸก",
            "INFO": "๐Ÿ”ต",
            "DEBUG": "โšช",
            "CRITICAL": "๐Ÿšจ"
        }
        return emoji_map.get(self.level, "๐Ÿ“")

# ๐Ÿ”ง Pipeline functions
def parse_logs(raw_logs: List[str]) -> List[LogEntry]:
    """Parse raw log strings ๐Ÿ“„"""
    entries = []
    pattern = r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)'
    
    for log in raw_logs:
        match = re.match(pattern, log)
        if match:
            timestamp, level, message = match.groups()
            entries.append(LogEntry(timestamp, level, message))
    
    return entries

def filter_by_level(min_level: str) -> Callable:
    """Filter logs by minimum severity ๐Ÿ”"""
    level_order = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
    min_index = level_order.index(min_level.upper())
    
    def filter_func(entries: List[LogEntry]) -> List[LogEntry]:
        return [
            entry for entry in entries 
            if level_order.index(entry.level) >= min_index
        ]
    return filter_func

def calculate_error_rates(entries: List[LogEntry]) -> Dict[str, float]:
    """Calculate error rates by hour ๐Ÿ“Š"""
    hourly_counts = defaultdict(lambda: {"total": 0, "errors": 0})
    
    for entry in entries:
        hour_key = entry.timestamp.strftime("%Y-%m-%d %H:00")
        hourly_counts[hour_key]["total"] += 1
        if entry.level in ["ERROR", "CRITICAL"]:
            hourly_counts[hour_key]["errors"] += 1
    
    rates = {}
    for hour, counts in hourly_counts.items():
        rate = (counts["errors"] / counts["total"] * 100) if counts["total"] > 0 else 0
        rates[hour] = round(rate, 2)
    
    return rates

def detect_patterns(entries: List[LogEntry]) -> List[Dict]:
    """Detect common error patterns ๐Ÿ”Ž"""
    error_messages = [e.message for e in entries if e.level == "ERROR"]
    
    # Simple pattern detection
    patterns = defaultdict(int)
    for msg in error_messages:
        # Extract common patterns
        if "timeout" in msg.lower():
            patterns["Timeout Errors"] += 1
        elif "connection" in msg.lower():
            patterns["Connection Errors"] += 1
        elif "null" in msg.lower() or "undefined" in msg.lower():
            patterns["Null Reference Errors"] += 1
    
    return [
        {"pattern": pattern, "count": count, "emoji": "๐Ÿ”"}
        for pattern, count in patterns.items()
        if count > 1
    ]

def generate_summary(entries: List[LogEntry], 
                    error_rates: Dict[str, float], 
                    patterns: List[Dict]) -> Dict:
    """Generate analysis summary ๐Ÿ“‹"""
    level_counts = defaultdict(int)
    for entry in entries:
        level_counts[entry.level] += 1
    
    return {
        "total_logs": len(entries),
        "time_range": {
            "start": min(e.timestamp for e in entries).isoformat(),
            "end": max(e.timestamp for e in entries).isoformat()
        },
        "level_distribution": dict(level_counts),
        "average_error_rate": round(sum(error_rates.values()) / len(error_rates), 2) if error_rates else 0,
        "peak_error_hour": max(error_rates.items(), key=lambda x: x[1]) if error_rates else None,
        "detected_patterns": patterns,
        "health_status": "๐ŸŸข Healthy" if level_counts["ERROR"] < 10 else "๐Ÿ”ด Needs Attention",
        "emoji": "๐Ÿ“Š"
    }

# ๐Ÿš€ Build the complete pipeline
log_analysis_pipeline = pipeline(
    parse_logs,
    filter_by_level("INFO"),
    lambda entries: {
        "entries": entries,
        "error_rates": calculate_error_rates(entries),
        "patterns": detect_patterns(entries)
    },
    lambda data: generate_summary(
        data["entries"], 
        data["error_rates"], 
        data["patterns"]
    )
)

# ๐ŸŽฎ Test it!
test_logs = [
    "2024-01-15T10:00:00 [INFO] Application started",
    "2024-01-15T10:05:00 [ERROR] Connection timeout to database",
    "2024-01-15T10:10:00 [WARNING] High memory usage detected",
    "2024-01-15T10:15:00 [ERROR] Null reference in user module",
    "2024-01-15T10:20:00 [CRITICAL] System out of memory",
    # Add more test logs...
]

result = log_analysis_pipeline(test_logs)
print(f"{result['emoji']} Log Analysis Complete!")
print(f"Health Status: {result['health_status']}")
print(f"Total Logs: {result['total_logs']}")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Build functional pipelines with confidence ๐Ÿ’ช
  • โœ… Compose complex operations from simple functions ๐Ÿ›ก๏ธ
  • โœ… Process data immutably without side effects ๐ŸŽฏ
  • โœ… Handle errors gracefully in your pipelines ๐Ÿ›
  • โœ… Create reusable, testable data transformations! ๐Ÿš€

Remember: Functional programming is about building reliable, composable systems. Each function is a building block! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered functional data processing pipelines!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Build a pipeline for your own data processing needs
  3. ๐Ÿ“š Explore libraries like toolz or fn.py for more FP tools
  4. ๐ŸŒŸ Share your pipeline creations with the community!

Remember: Every data engineer started with their first pipeline. Keep building, keep learning, and most importantly, have fun! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ