🔄 MLOps

Machine Learning Operations

What is MLOps?

MLOps (Machine Learning Operations) combines ML, DevOps, and data engineering to deploy and maintain ML systems reliably in production.

MLOps Principles:

  • Automation: CI/CD for ML pipelines
  • Versioning: Code, data, and models
  • Monitoring: Track model performance
  • Reproducibility: Consistent results
  • Collaboration: Data scientists + engineers

🔄 ML Lifecycle

# Typical ML workflow stages:

1. Data Collection & Preparation
   - Gather data from sources
   - Clean and validate
   - Version control data

2. Feature Engineering
   - Create features
   - Feature store for reuse

3. Model Training
   - Experiment tracking
   - Hyperparameter tuning
   - Model versioning

4. Model Evaluation
   - Test on validation set
   - Compare with baseline
   - A/B testing

5. Deployment
   - Package model
   - Deploy to production
   - Gradual rollout

6. Monitoring
   - Track predictions
   - Detect drift
   - Alert on anomalies

7. Retraining
   - Trigger based on performance
   - Automated retraining pipeline

📊 Experiment Tracking - MLflow

# pip install mlflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# Start MLflow run
with mlflow.start_run():
    # Generate data
    X, y = make_classification(n_samples=1000, n_features=20)
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    
    # Log parameters
    n_estimators = 100
    max_depth = 10
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    
    # Train model
    model = RandomForestClassifier(n_estimators=n_estimators, 
                                   max_depth=max_depth)
    model.fit(X_train, y_train)
    
    # Log metrics
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    mlflow.log_metric("train_accuracy", train_score)
    mlflow.log_metric("test_accuracy", test_score)
    
    # Log model
    mlflow.sklearn.log_model(model, "model")
    
    print(f"Run ID: {mlflow.active_run().info.run_id}")

# View results: mlflow ui
# Access at http://localhost:5000

📦 Model Registry

# Register model with MLflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = "runs:/RUN_ID/model"
mlflow.register_model(model_uri, "RandomForestClassifier")

# Transition to production
client.transition_model_version_stage(
    name="RandomForestClassifier",
    version=1,
    stage="Production"
)

# Load production model
model = mlflow.pyfunc.load_model(
    model_uri="models:/RandomForestClassifier/Production"
)

# Use model
predictions = model.predict(X_test)

🔗 Data Versioning - DVC

# pip install dvc

# Initialize DVC
dvc init

# Track data file
dvc add data/train.csv

# Commit to git
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"

# Push data to remote storage (S3, GCS, etc.)
dvc remote add -d storage s3://my-bucket/dvc-storage
dvc push

# Pull data on another machine
git pull
dvc pull

# Create pipeline
dvc run -n prepare \
    -d data/raw.csv \
    -o data/processed.csv \
    python prepare.py

dvc run -n train \
    -d data/processed.csv \
    -o model.pkl \
    python train.py

# Reproduce pipeline
dvc repro

🤖 CI/CD Pipeline

GitHub Actions

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]

jobs:
  train-and-deploy:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.9
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    
    - name: Run tests
      run: |
        pytest tests/
    
    - name: Train model
      run: |
        python train.py
    
    - name: Evaluate model
      run: |
        python evaluate.py
    
    - name: Build Docker image
      run: |
        docker build -t ml-model:${{ github.sha }} .
    
    - name: Deploy to production
      if: success()
      run: |
        # Deploy commands here
        echo "Deploying model..."

📈 Model Monitoring

import numpy as np
from scipy import stats

class ModelMonitor:
    def __init__(self, reference_data):
        self.reference_data = reference_data
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)
    
    def detect_drift(self, new_data, threshold=0.05):
        """Detect distribution drift using KS test"""
        drifted_features = []
        
        for i in range(new_data.shape[1]):
            # Kolmogorov-Smirnov test
            statistic, p_value = stats.ks_2samp(
                self.reference_data[:, i],
                new_data[:, i]
            )
            
            if p_value < threshold:
                drifted_features.append(i)
        
        return drifted_features
    
    def check_input_quality(self, data):
        """Check for data quality issues"""
        issues = []
        
        # Missing values
        if np.any(np.isnan(data)):
            issues.append("Missing values detected")
        
        # Out of range
        z_scores = np.abs((data - self.reference_mean) / self.reference_std)
        if np.any(z_scores > 3):
            issues.append("Outliers detected (z-score > 3)")
        
        return issues

# Usage
monitor = ModelMonitor(X_train)
drifted = monitor.detect_drift(X_test)
if drifted:
    print(f"Drift detected in features: {drifted}")
    # Trigger retraining

issues = monitor.check_input_quality(X_test)
if issues:
    print(f"Quality issues: {issues}")

🔔 Alerting

# Monitor model performance and alert
import smtplib
from email.mime.text import MIMEText

class PerformanceMonitor:
    def __init__(self, baseline_accuracy=0.85, threshold=0.05):
        self.baseline_accuracy = baseline_accuracy
        self.threshold = threshold
        self.recent_predictions = []
        self.recent_actuals = []
    
    def log_prediction(self, prediction, actual):
        self.recent_predictions.append(prediction)
        self.recent_actuals.append(actual)
        
        # Keep last 100 predictions
        if len(self.recent_predictions) > 100:
            self.recent_predictions.pop(0)
            self.recent_actuals.pop(0)
    
    def check_performance(self):
        if len(self.recent_predictions) < 10:
            return True
        
        # Calculate recent accuracy
        correct = sum(p == a for p, a in 
                     zip(self.recent_predictions, self.recent_actuals))
        accuracy = correct / len(self.recent_predictions)
        
        # Check if degraded
        if accuracy < self.baseline_accuracy - self.threshold:
            self.send_alert(accuracy)
            return False
        
        return True
    
    def send_alert(self, current_accuracy):
        message = f"""
        Model performance degradation detected!
        
        Baseline accuracy: {self.baseline_accuracy:.2%}
        Current accuracy: {current_accuracy:.2%}
        Threshold: {self.threshold:.2%}
        
        Action required: Investigate and retrain model.
        """
        
        print(f"ALERT: {message}")
        # Send email/Slack notification here

# Usage
monitor = PerformanceMonitor(baseline_accuracy=0.90)

for pred, actual in zip(predictions, actuals):
    monitor.log_prediction(pred, actual)
    if not monitor.check_performance():
        # Trigger retraining pipeline
        pass

🔄 Automated Retraining

# Scheduled retraining pipeline
from datetime import datetime
import schedule

class RetrainingPipeline:
    def __init__(self, model, data_loader):
        self.model = model
        self.data_loader = data_loader
        self.last_trained = None
    
    def should_retrain(self):
        # Check conditions
        if self.last_trained is None:
            return True
        
        # Retrain if model is old
        days_since_training = (datetime.now() - self.last_trained).days
        if days_since_training > 7:
            return True
        
        # Retrain if performance dropped
        # (implement your logic)
        
        return False
    
    def retrain(self):
        print(f"Starting retraining at {datetime.now()}")
        
        # Load new data
        X_train, y_train = self.data_loader.load_latest()
        
        # Train model
        self.model.fit(X_train, y_train)
        
        # Evaluate
        X_val, y_val = self.data_loader.load_validation()
        score = self.model.score(X_val, y_val)
        
        if score > 0.85:  # Threshold
            # Save and deploy
            joblib.dump(self.model, 'model_new.joblib')
            self.last_trained = datetime.now()
            print(f"Retraining successful. Score: {score:.3f}")
            return True
        else:
            print(f"Retraining failed. Score: {score:.3f}")
            return False
    
    def run(self):
        if self.should_retrain():
            self.retrain()

# Schedule retraining
pipeline = RetrainingPipeline(model, data_loader)

schedule.every().day.at("02:00").do(pipeline.run)

# Or trigger on demand
if drift_detected or performance_dropped:
    pipeline.retrain()

🛠️ MLOps Tools

Tool Purpose Key Features
MLflow Experiment tracking Track runs, model registry, deployment
DVC Data versioning Git for data, pipeline management
Kubeflow ML on Kubernetes Pipelines, serving, distributed training
Airflow Workflow orchestration DAGs, scheduling, monitoring
Weights & Biases Experiment tracking Visualizations, collaboration
SageMaker End-to-end ML (AWS) Training, deployment, monitoring

💡 MLOps Best Practices

🎯 Key Takeaways