Home AI Article
AIMachine Learning

Build an AI Content Moderator: Complete Python Tutorial for Text and Image Moderation

👤 By harshith
📅 Dec 11, 2025
⏱️ 12 min read
💬 0 Comments

📑 Table of Contents

Jump to sections as you read...

Introduction to AI Content Moderation

Content moderation is one of the most critical applications of AI in the modern internet. Every social platform, forum, and community relies on automated systems to detect and filter harmful content before it reaches users. From identifying hate speech and harassment to detecting explicit images and spam, AI content moderators work tirelessly behind the scenes.

In this comprehensive tutorial, you’ll build a production-ready content moderation system that handles both text and images. You’ll learn techniques used by major platforms to keep their communities safe while minimizing false positives that frustrate legitimate users.

What You’ll Build

By the end of this tutorial, you’ll have a complete moderation system that:

  • Detects toxic language, hate speech, and harassment in text
  • Identifies spam, promotional content, and manipulation attempts
  • Analyzes images for explicit or inappropriate content
  • Provides confidence scores and explanations for moderation decisions
  • Supports configurable thresholds for different community standards
  • Includes an API for easy integration with any application

Understanding Content Moderation

Categories of Harmful Content

Content moderation typically addresses several categories:

Toxicity and Hate Speech: Comments targeting individuals or groups based on protected characteristics

Harassment and Bullying: Personal attacks, threats, or intimidation

Explicit Content: Sexual, violent, or graphic material inappropriate for general audiences

Spam and Manipulation: Promotional content, scams, or coordinated inauthentic behavior

Misinformation: False or misleading claims about important topics

The Moderation Spectrum

Modern systems don’t just flag content as “good” or “bad.” They provide nuanced classifications:

  • Allowed: Content meets community guidelines
  • Needs Review: Borderline content requiring human assessment
  • Auto-Remove: Clearly violating content removed immediately
  • Shadow-Ban: Content visible only to author (for repeat offenders)

Prerequisites and Setup

Required Libraries

# Create virtual environment
python -m venv moderation_env
source moderation_env/bin/activate

# Core dependencies
pip install transformers torch torchvision
pip install detoxify  # For toxicity detection
pip install openai    # For advanced moderation
pip install pillow    # Image processing
pip install flask flask-cors
pip install python-dotenv
pip install requests numpy pandas

Project Structure

content_moderator/
├── app.py                  # Flask API
├── text_moderator.py       # Text moderation logic
├── image_moderator.py      # Image moderation logic
├── models/
│   └── config.py           # Model configurations
├── utils/
│   ├── preprocessing.py    # Text/image preprocessing
│   └── scoring.py          # Confidence scoring
├── templates/
│   └── dashboard.html      # Moderation dashboard
├── tests/
│   └── test_moderation.py  # Unit tests
└── requirements.txt

Step 1: Text Moderation Engine

We’ll use multiple approaches for comprehensive text analysis:

# text_moderator.py
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from detoxify import Detoxify

class ContentCategory(Enum):
    SAFE = "safe"
    TOXIC = "toxic"
    HATE_SPEECH = "hate_speech"
    HARASSMENT = "harassment"
    SPAM = "spam"
    EXPLICIT = "explicit"
    VIOLENCE = "violence"

class ModerationAction(Enum):
    ALLOW = "allow"
    FLAG = "flag"
    REVIEW = "review"
    REMOVE = "remove"

@dataclass
class ModerationResult:
    action: ModerationAction
    categories: Dict[str, float]
    primary_concern: Optional[str]
    confidence: float
    explanation: str
    original_text: str

class TextModerator:
    """Multi-model text content moderation system."""

    def __init__(self, device: str = None):
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

        # Initialize Detoxify for toxicity detection
        self.toxicity_model = Detoxify("multilingual", device=self.device)

        # Load spam detection model
        self.spam_tokenizer = AutoTokenizer.from_pretrained(
            "mrm8488/bert-tiny-finetuned-sms-spam-detection"
        )
        self.spam_model = AutoModelForSequenceClassification.from_pretrained(
            "mrm8488/bert-tiny-finetuned-sms-spam-detection"
        ).to(self.device)

        # Thresholds for different categories
        self.thresholds = {
            "toxicity": 0.7,
            "severe_toxicity": 0.5,
            "obscene": 0.8,
            "identity_attack": 0.6,
            "insult": 0.7,
            "threat": 0.5,
            "sexual_explicit": 0.7,
            "spam": 0.8
        }

        # Pattern-based filters (for obvious violations)
        self.blocked_patterns = self._load_blocked_patterns()

    def _load_blocked_patterns(self) -> List[re.Pattern]:
        """Load regex patterns for known harmful content."""
        patterns = [
            # URLs with suspicious TLDs
            r"https?://[^\s]+\.(xyz|tk|ml|ga|cf)\b",
            # Cryptocurrency scam patterns
            r"(send|transfer)\s+\d+\s*(btc|eth|crypto)",
            # Contact information harvesting
            r"(dm|message|contact)\s+me\s+(for|to)\s+(earn|make|win)",
        ]
        return [re.compile(p, re.IGNORECASE) for p in patterns]

    def preprocess_text(self, text: str) -> str:
        """Clean and normalize text for analysis."""
        text = " ".join(text.split())
        text = text.encode("ascii", "ignore").decode()
        return text.strip()

    def detect_toxicity(self, text: str) -> Dict[str, float]:
        """Analyze text for toxic content using Detoxify."""
        results = self.toxicity_model.predict(text)
        return {k: float(v) for k, v in results.items()}

    def detect_spam(self, text: str) -> float:
        """Detect spam content using BERT model."""
        inputs = self.spam_tokenizer(
            text,
            return_tensors="pt",
            truncation=True,
            max_length=512
        ).to(self.device)

        with torch.no_grad():
            outputs = self.spam_model(**inputs)
            probs = torch.softmax(outputs.logits, dim=1)
            spam_prob = probs[0][1].item()

        return spam_prob

    def check_patterns(self, text: str) -> List[str]:
        """Check for blocked patterns."""
        matches = []
        for pattern in self.blocked_patterns:
            if pattern.search(text):
                matches.append(pattern.pattern)
        return matches

    def analyze_context(self, text: str) -> Dict[str, any]:
        """Analyze contextual signals in the text."""
        return {
            "length": len(text),
            "word_count": len(text.split()),
            "caps_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
            "exclamation_count": text.count("!"),
            "question_count": text.count("?"),
            "url_count": len(re.findall(r"https?://", text)),
            "emoji_count": len(re.findall(r"[\U0001F600-\U0001F64F]", text)),
            "repeated_chars": bool(re.search(r"(.)\1{4,}", text))
        }

    def calculate_severity(
        self,
        toxicity_scores: Dict[str, float],
        spam_score: float,
        context: Dict
    ) -> Tuple[float, str]:
        """Calculate overall severity and primary concern."""

        severity_factors = {
            "severe_toxicity": toxicity_scores.get("severe_toxicity", 0) * 2.0,
            "threat": toxicity_scores.get("threat", 0) * 1.8,
            "identity_attack": toxicity_scores.get("identity_attack", 0) * 1.5,
            "toxicity": toxicity_scores.get("toxicity", 0) * 1.0,
            "insult": toxicity_scores.get("insult", 0) * 0.8,
            "obscene": toxicity_scores.get("obscene", 0) * 0.7,
            "spam": spam_score * 1.2
        }

        primary = max(severity_factors.items(), key=lambda x: x[1])
        severity = min(1.0, sum(severity_factors.values()) / 3)

        return severity, primary[0] if primary[1] > 0.3 else None

    def determine_action(
        self,
        toxicity_scores: Dict[str, float],
        spam_score: float,
        pattern_matches: List[str],
        severity: float
    ) -> ModerationAction:
        """Determine appropriate moderation action."""

        if pattern_matches or toxicity_scores.get("severe_toxicity", 0) > 0.7:
            return ModerationAction.REMOVE

        if severity > 0.8:
            return ModerationAction.REMOVE

        if severity > 0.5:
            return ModerationAction.REVIEW

        if severity > 0.3:
            return ModerationAction.FLAG

        return ModerationAction.ALLOW

    def generate_explanation(
        self,
        action: ModerationAction,
        toxicity_scores: Dict[str, float],
        spam_score: float,
        primary_concern: str
    ) -> str:
        """Generate human-readable explanation for the decision."""

        if action == ModerationAction.ALLOW:
            return "Content meets community guidelines."

        explanations = {
            "severe_toxicity": "Content contains severely toxic language.",
            "toxicity": "Content may be perceived as toxic or harmful.",
            "threat": "Content contains threatening language.",
            "identity_attack": "Content attacks individuals based on identity.",
            "insult": "Content contains insulting language.",
            "obscene": "Content contains obscene language.",
            "spam": "Content appears to be spam or promotional.",
        }

        base_explanation = explanations.get(
            primary_concern,
            "Content requires review for policy compliance."
        )

        if action == ModerationAction.REMOVE:
            return f"Auto-removed: {base_explanation}"
        elif action == ModerationAction.REVIEW:
            return f"Flagged for review: {base_explanation}"
        else:
            return f"Warning: {base_explanation}"

    def moderate(self, text: str) -> ModerationResult:
        """Perform comprehensive content moderation."""

        cleaned_text = self.preprocess_text(text)

        if not cleaned_text:
            return ModerationResult(
                action=ModerationAction.ALLOW,
                categories={},
                primary_concern=None,
                confidence=1.0,
                explanation="Empty content.",
                original_text=text
            )

        toxicity_scores = self.detect_toxicity(cleaned_text)
        spam_score = self.detect_spam(cleaned_text)
        pattern_matches = self.check_patterns(cleaned_text)
        context = self.analyze_context(cleaned_text)

        severity, primary_concern = self.calculate_severity(
            toxicity_scores, spam_score, context
        )

        action = self.determine_action(
            toxicity_scores, spam_score, pattern_matches, severity
        )

        explanation = self.generate_explanation(
            action, toxicity_scores, spam_score, primary_concern
        )

        categories = {**toxicity_scores, "spam": spam_score}

        return ModerationResult(
            action=action,
            categories=categories,
            primary_concern=primary_concern,
            confidence=severity,
            explanation=explanation,
            original_text=text
        )

Step 2: Image Moderation Engine

Now let’s add image moderation capabilities:

# image_moderator.py
import io
import base64
from typing import Dict, Optional, List
from dataclasses import dataclass
from PIL import Image
import torch
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
import requests
import os

@dataclass
class ImageModerationResult:
    is_safe: bool
    categories: Dict[str, float]
    primary_concern: Optional[str]
    confidence: float
    explanation: str

class ImageModerator:
    """AI-powered image content moderation."""

    def __init__(self, device: str = None):
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

        # Load NSFW detection model
        self.nsfw_extractor = AutoFeatureExtractor.from_pretrained(
            "Falconsai/nsfw_image_detection"
        )
        self.nsfw_model = AutoModelForImageClassification.from_pretrained(
            "Falconsai/nsfw_image_detection"
        ).to(self.device)

        # Category thresholds
        self.thresholds = {
            "nsfw": 0.7,
            "violence": 0.6,
            "hate_symbols": 0.5
        }

    def load_image(self, image_source: str) -> Image.Image:
        """Load image from file path, URL, or base64."""

        if image_source.startswith("http"):
            response = requests.get(image_source, timeout=10)
            image = Image.open(io.BytesIO(response.content))
        elif image_source.startswith("data:image"):
            base64_data = image_source.split(",")[1]
            image_data = base64.b64decode(base64_data)
            image = Image.open(io.BytesIO(image_data))
        else:
            image = Image.open(image_source)

        return image.convert("RGB")

    def detect_nsfw(self, image: Image.Image) -> Dict[str, float]:
        """Detect NSFW content in image."""

        inputs = self.nsfw_extractor(images=image, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = self.nsfw_model(**inputs)
            probs = torch.softmax(outputs.logits, dim=1)

        labels = self.nsfw_model.config.id2label
        results = {}

        for idx, prob in enumerate(probs[0]):
            label = labels[idx]
            results[label] = float(prob)

        return results

    def analyze_image_properties(self, image: Image.Image) -> Dict:
        """Analyze basic image properties for additional signals."""

        width, height = image.size

        return {
            "width": width,
            "height": height,
            "aspect_ratio": width / height,
            "is_small": width < 100 or height < 100,
            "is_large": width > 4000 or height > 4000,
        }

    def moderate(self, image_source: str) -> ImageModerationResult:
        """Perform comprehensive image moderation."""

        try:
            image = self.load_image(image_source)
        except Exception as e:
            return ImageModerationResult(
                is_safe=False,
                categories={},
                primary_concern="load_error",
                confidence=0.0,
                explanation=f"Failed to load image: {str(e)}"
            )

        nsfw_scores = self.detect_nsfw(image)
        properties = self.analyze_image_properties(image)

        nsfw_score = nsfw_scores.get("nsfw", 0)
        is_safe = nsfw_score < self.thresholds["nsfw"]

        primary_concern = None
        if not is_safe:
            primary_concern = max(nsfw_scores.items(), key=lambda x: x[1])[0]

        if is_safe:
            explanation = "Image content is appropriate."
        else:
            explanation = f"Image flagged for {primary_concern} content (confidence: {nsfw_score:.1%})."

        return ImageModerationResult(
            is_safe=is_safe,
            categories=nsfw_scores,
            primary_concern=primary_concern,
            confidence=nsfw_score,
            explanation=explanation
        )

Step 3: Flask API

Create a REST API for the moderation system:

# app.py
import os
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
from werkzeug.utils import secure_filename
from dotenv import load_dotenv

from text_moderator import TextModerator
from image_moderator import ImageModerator

load_dotenv()

app = Flask(__name__)
CORS(app)

app.config["UPLOAD_FOLDER"] = "uploads"
app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024  # 10MB

os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)

# Initialize moderators
text_moderator = TextModerator()
image_moderator = ImageModerator()

@app.route("/")
def index():
    return render_template("dashboard.html")

@app.route("/api/moderate/text", methods=["POST"])
def moderate_text():
    """Moderate text content."""
    data = request.get_json()
    text = data.get("text", "").strip()

    if not text:
        return jsonify({"error": "No text provided"}), 400

    result = text_moderator.moderate(text)

    return jsonify({
        "action": result.action.value,
        "is_safe": result.action.value == "allow",
        "confidence": result.confidence,
        "summary": result.explanation,
        "categories": result.categories
    })

@app.route("/api/moderate/image", methods=["POST"])
def moderate_image():
    """Moderate image content."""

    if "image" in request.files:
        file = request.files["image"]
        filename = secure_filename(file.filename)
        filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
        file.save(filepath)
        image_source = filepath
    elif request.is_json:
        data = request.get_json()
        image_source = data.get("image_url", "")
    else:
        return jsonify({"error": "No image provided"}), 400

    result = image_moderator.moderate(image_source)

    return jsonify({
        "action": "allow" if result.is_safe else "remove",
        "is_safe": result.is_safe,
        "confidence": result.confidence,
        "summary": result.explanation,
        "categories": result.categories
    })

@app.route("/api/health")
def health():
    return jsonify({"status": "healthy"})

if __name__ == "__main__":
    app.run(debug=True, port=5000)

Production Considerations

Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app=app,
    key_func=get_remote_address,
    default_limits=["1000 per hour", "100 per minute"]
)

@app.route("/api/moderate/text", methods=["POST"])
@limiter.limit("50 per minute")
def moderate_text():
    # ... implementation

Caching for Repeated Content

import hashlib
from functools import lru_cache

@lru_cache(maxsize=10000)
def cached_moderate(content_hash: str, text: str):
    return text_moderator.moderate(text)

def moderate_with_cache(text: str):
    content_hash = hashlib.md5(text.encode()).hexdigest()
    return cached_moderate(content_hash, text)

Testing Your Moderation System

# test_moderation.py
import pytest
from text_moderator import TextModerator, ModerationAction

@pytest.fixture
def moderator():
    return TextModerator()

def test_safe_content(moderator):
    result = moderator.moderate("Hello, how are you today?")
    assert result.action == ModerationAction.ALLOW

def test_toxic_content(moderator):
    result = moderator.moderate("You are an idiot and I hate you!")
    assert result.action in [ModerationAction.FLAG, ModerationAction.REVIEW]

def test_empty_content(moderator):
    result = moderator.moderate("")
    assert result.action == ModerationAction.ALLOW

def test_spam_content(moderator):
    result = moderator.moderate("Click here to win $1000! DM me now!")
    assert result.categories.get("spam", 0) > 0.5

Conclusion

You've built a comprehensive AI content moderation system that combines multiple models for accurate detection of harmful content. This system demonstrates production patterns including multi-model ensemble approaches, configurable thresholds, and human review workflows.

Key takeaways:

  • Multiple models provide better coverage than single-model approaches
  • Confidence scores enable nuanced moderation decisions
  • Pattern matching catches obvious violations quickly
  • Human review queues are essential for borderline cases
  • Regular model updates keep pace with evolving harmful content

This foundation can be extended with features like language detection for multilingual support, user reputation scoring, appeal workflows, and integration with community reporting systems.

Found this helpful? Share it!

Help others discover this content

About harshith

AI & ML enthusiast sharing insights and tutorials.

View all posts by harshith →