🚨 Anomaly Detection

Find unusual patterns and outliers

What is Anomaly Detection?

Anomaly detection identifies data points that differ significantly from the majority. Used in fraud detection, network security, quality control, and system monitoring.

Applications:

  • Fraud detection: Credit card fraud, insurance claims
  • Network security: Intrusion detection
  • Manufacturing: Defect detection
  • Healthcare: Disease outbreak detection
  • System monitoring: Server performance anomalies

🌳 Isolation Forest

from sklearn.ensemble import IsolationForest
from sklearn.datasets import make_blobs
import numpy as np
import matplotlib.pyplot as plt

# Generate normal data + outliers
X_normal, _ = make_blobs(n_samples=300, centers=1, n_features=2, 
                         cluster_std=1, random_state=42)
X_outliers = np.random.uniform(low=-8, high=8, size=(20, 2))
X = np.vstack([X_normal, X_outliers])

# Isolation Forest
iso_forest = IsolationForest(
    contamination=0.1,  # Expected proportion of outliers
    random_state=42,
    n_estimators=100
)

# Fit and predict (-1 for outliers, 1 for inliers)
y_pred = iso_forest.fit_predict(X)

# Get anomaly scores (lower = more anomalous)
scores = iso_forest.score_samples(X)

print(f"Number of anomalies detected: {(y_pred == -1).sum()}")
print(f"Score range: [{scores.min():.3f}, {scores.max():.3f}]")

# Visualize
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred == 1, 0], X[y_pred == 1, 1], 
           c='blue', label='Normal', alpha=0.6)
plt.scatter(X[y_pred == -1, 0], X[y_pred == -1, 1], 
           c='red', label='Anomaly', s=100, marker='x')
plt.title('Isolation Forest Anomaly Detection')
plt.legend()
plt.show()

How It Works:

🎯 Local Outlier Factor (LOF)

from sklearn.neighbors import LocalOutlierFactor

# LOF: Compares local density of point to neighbors
lof = LocalOutlierFactor(
    n_neighbors=20,
    contamination=0.1
)

# Fit and predict
y_pred_lof = lof.fit_predict(X)

# Negative outlier factor (lower = more anomalous)
scores_lof = lof.negative_outlier_factor_

print(f"Number of anomalies: {(y_pred_lof == -1).sum()}")
print(f"LOF scores range: [{scores_lof.min():.3f}, {scores_lof.max():.3f}]")

# Visualize
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred_lof == 1, 0], X[y_pred_lof == 1, 1],
           c='blue', label='Normal', alpha=0.6)
plt.scatter(X[y_pred_lof == -1, 0], X[y_pred_lof == -1, 1],
           c='red', label='Anomaly', s=100, marker='x')
plt.title('Local Outlier Factor')
plt.legend()
plt.show()

How It Works:

🔵 One-Class SVM

from sklearn.svm import OneClassSVM

# One-Class SVM: Finds decision boundary around normal data
oc_svm = OneClassSVM(
    kernel='rbf',
    gamma='auto',
    nu=0.1  # Upper bound on fraction of outliers
)

y_pred_svm = oc_svm.fit_predict(X)

print(f"Number of anomalies: {(y_pred_svm == -1).sum()}")

# Decision function (distance from boundary)
scores_svm = oc_svm.decision_function(X)

# Visualize
plt.figure(figsize=(10, 6))
plt.scatter(X[y_pred_svm == 1, 0], X[y_pred_svm == 1, 1],
           c='blue', label='Normal', alpha=0.6)
plt.scatter(X[y_pred_svm == -1, 0], X[y_pred_svm == -1, 1],
           c='red', label='Anomaly', s=100, marker='x')
plt.title('One-Class SVM')
plt.legend()
plt.show()

🧠 Autoencoder for Anomaly Detection

from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import StandardScaler

# Scale data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Build autoencoder
input_dim = X_scaled.shape[1]
encoding_dim = 1

# Encoder
encoder = keras.Sequential([
    layers.Dense(8, activation='relu', input_shape=(input_dim,)),
    layers.Dense(4, activation='relu'),
    layers.Dense(encoding_dim, activation='relu')
])

# Decoder
decoder = keras.Sequential([
    layers.Dense(4, activation='relu', input_shape=(encoding_dim,)),
    layers.Dense(8, activation='relu'),
    layers.Dense(input_dim, activation='linear')
])

# Autoencoder
autoencoder = keras.Sequential([encoder, decoder])

autoencoder.compile(optimizer='adam', loss='mse')

# Train on normal data only
X_train = X_scaled[y_pred == 1]  # Use normal samples
history = autoencoder.fit(
    X_train, X_train,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    verbose=0
)

# Reconstruction error
X_reconstructed = autoencoder.predict(X_scaled)
reconstruction_error = np.mean(np.square(X_scaled - X_reconstructed), axis=1)

# Threshold for anomalies
threshold = np.percentile(reconstruction_error, 90)
y_pred_ae = (reconstruction_error > threshold).astype(int)
y_pred_ae = np.where(y_pred_ae == 1, -1, 1)

print(f"Threshold: {threshold:.4f}")
print(f"Anomalies detected: {(y_pred_ae == -1).sum()}")

# Plot reconstruction error
plt.figure(figsize=(10, 6))
plt.hist(reconstruction_error[y_pred == 1], bins=50, 
         alpha=0.6, label='Normal')
plt.hist(reconstruction_error[y_pred == -1], bins=50, 
         alpha=0.6, label='Anomaly')
plt.axvline(threshold, color='r', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.show()

📊 Statistical Methods

Z-Score Method

# Detect outliers using standard deviation
def zscore_anomalies(data, threshold=3):
    mean = np.mean(data, axis=0)
    std = np.std(data, axis=0)
    z_scores = np.abs((data - mean) / std)
    
    # Any feature with z-score > threshold
    return np.any(z_scores > threshold, axis=1)

y_pred_zscore = zscore_anomalies(X, threshold=3)
print(f"Z-score anomalies: {y_pred_zscore.sum()}")

Interquartile Range (IQR)

# IQR method for outlier detection
def iqr_anomalies(data):
    Q1 = np.percentile(data, 25, axis=0)
    Q3 = np.percentile(data, 75, axis=0)
    IQR = Q3 - Q1
    
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    outliers = np.any((data < lower_bound) | (data > upper_bound), axis=1)
    return outliers

y_pred_iqr = iqr_anomalies(X)
print(f"IQR anomalies: {y_pred_iqr.sum()}")

📈 Time Series Anomaly Detection

# Generate time series with anomalies
np.random.seed(42)
n = 1000
t = np.linspace(0, 100, n)
signal = np.sin(t) + 0.1 * np.random.randn(n)

# Add anomalies
anomaly_indices = [200, 400, 600, 800]
signal[anomaly_indices] += np.random.uniform(2, 4, len(anomaly_indices))

# Sliding window approach
window_size = 20

def detect_ts_anomalies(data, window_size, threshold=3):
    anomalies = []
    for i in range(window_size, len(data)):
        window = data[i-window_size:i]
        mean = np.mean(window)
        std = np.std(window)
        
        z_score = abs((data[i] - mean) / std)
        if z_score > threshold:
            anomalies.append(i)
    
    return anomalies

anomalies = detect_ts_anomalies(signal, window_size)

# Plot
plt.figure(figsize=(15, 6))
plt.plot(t, signal, label='Signal')
plt.scatter(t[anomalies], signal[anomalies], 
           c='red', s=100, marker='x', label='Anomalies')
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Time Series Anomaly Detection')
plt.legend()
plt.show()

print(f"Detected {len(anomalies)} anomalies")

📊 Algorithm Comparison

Method Speed Scalability Best For
Isolation Forest Fast High-dimensional General purpose, large datasets
LOF Medium Small-medium Varying density clusters
One-Class SVM Slow Small datasets Clear boundary around normal
Autoencoder Slow (training) High-dimensional Complex patterns, images
Z-Score Very fast Any size Gaussian data, quick check
IQR Very fast Any size Univariate, robust to outliers

💡 Best Practices

🎯 Key Takeaways