Introduction to AI Content Moderation
Content moderation is one of the most critical applications of AI in the modern internet. Every social platform, forum, and community relies on automated systems to detect and filter harmful content before it reaches users. From identifying hate speech and harassment to detecting explicit images and spam, AI content moderators work tirelessly behind the scenes.
In this comprehensive tutorial, you’ll build a production-ready content moderation system that handles both text and images. You’ll learn techniques used by major platforms to keep their communities safe while minimizing false positives that frustrate legitimate users.
What You’ll Build
By the end of this tutorial, you’ll have a complete moderation system that:
- Detects toxic language, hate speech, and harassment in text
- Identifies spam, promotional content, and manipulation attempts
- Analyzes images for explicit or inappropriate content
- Provides confidence scores and explanations for moderation decisions
- Supports configurable thresholds for different community standards
- Includes an API for easy integration with any application
Understanding Content Moderation
Categories of Harmful Content
Content moderation typically addresses several categories:
Toxicity and Hate Speech: Comments targeting individuals or groups based on protected characteristics
Harassment and Bullying: Personal attacks, threats, or intimidation
Explicit Content: Sexual, violent, or graphic material inappropriate for general audiences
Spam and Manipulation: Promotional content, scams, or coordinated inauthentic behavior
Misinformation: False or misleading claims about important topics
The Moderation Spectrum
Modern systems don’t just flag content as “good” or “bad.” They provide nuanced classifications:
- Allowed: Content meets community guidelines
- Needs Review: Borderline content requiring human assessment
- Auto-Remove: Clearly violating content removed immediately
- Shadow-Ban: Content visible only to author (for repeat offenders)
Prerequisites and Setup
Required Libraries
# Create virtual environment
python -m venv moderation_env
source moderation_env/bin/activate
# Core dependencies
pip install transformers torch torchvision
pip install detoxify # For toxicity detection
pip install openai # For advanced moderation
pip install pillow # Image processing
pip install flask flask-cors
pip install python-dotenv
pip install requests numpy pandasProject Structure
content_moderator/
├── app.py # Flask API
├── text_moderator.py # Text moderation logic
├── image_moderator.py # Image moderation logic
├── models/
│ └── config.py # Model configurations
├── utils/
│ ├── preprocessing.py # Text/image preprocessing
│ └── scoring.py # Confidence scoring
├── templates/
│ └── dashboard.html # Moderation dashboard
├── tests/
│ └── test_moderation.py # Unit tests
└── requirements.txtStep 1: Text Moderation Engine
We’ll use multiple approaches for comprehensive text analysis:
# text_moderator.py
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from detoxify import Detoxify
class ContentCategory(Enum):
SAFE = "safe"
TOXIC = "toxic"
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
SPAM = "spam"
EXPLICIT = "explicit"
VIOLENCE = "violence"
class ModerationAction(Enum):
ALLOW = "allow"
FLAG = "flag"
REVIEW = "review"
REMOVE = "remove"
@dataclass
class ModerationResult:
action: ModerationAction
categories: Dict[str, float]
primary_concern: Optional[str]
confidence: float
explanation: str
original_text: str
class TextModerator:
"""Multi-model text content moderation system."""
def __init__(self, device: str = None):
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
# Initialize Detoxify for toxicity detection
self.toxicity_model = Detoxify("multilingual", device=self.device)
# Load spam detection model
self.spam_tokenizer = AutoTokenizer.from_pretrained(
"mrm8488/bert-tiny-finetuned-sms-spam-detection"
)
self.spam_model = AutoModelForSequenceClassification.from_pretrained(
"mrm8488/bert-tiny-finetuned-sms-spam-detection"
).to(self.device)
# Thresholds for different categories
self.thresholds = {
"toxicity": 0.7,
"severe_toxicity": 0.5,
"obscene": 0.8,
"identity_attack": 0.6,
"insult": 0.7,
"threat": 0.5,
"sexual_explicit": 0.7,
"spam": 0.8
}
# Pattern-based filters (for obvious violations)
self.blocked_patterns = self._load_blocked_patterns()
def _load_blocked_patterns(self) -> List[re.Pattern]:
"""Load regex patterns for known harmful content."""
patterns = [
# URLs with suspicious TLDs
r"https?://[^\s]+\.(xyz|tk|ml|ga|cf)\b",
# Cryptocurrency scam patterns
r"(send|transfer)\s+\d+\s*(btc|eth|crypto)",
# Contact information harvesting
r"(dm|message|contact)\s+me\s+(for|to)\s+(earn|make|win)",
]
return [re.compile(p, re.IGNORECASE) for p in patterns]
def preprocess_text(self, text: str) -> str:
"""Clean and normalize text for analysis."""
text = " ".join(text.split())
text = text.encode("ascii", "ignore").decode()
return text.strip()
def detect_toxicity(self, text: str) -> Dict[str, float]:
"""Analyze text for toxic content using Detoxify."""
results = self.toxicity_model.predict(text)
return {k: float(v) for k, v in results.items()}
def detect_spam(self, text: str) -> float:
"""Detect spam content using BERT model."""
inputs = self.spam_tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.spam_model(**inputs)
probs = torch.softmax(outputs.logits, dim=1)
spam_prob = probs[0][1].item()
return spam_prob
def check_patterns(self, text: str) -> List[str]:
"""Check for blocked patterns."""
matches = []
for pattern in self.blocked_patterns:
if pattern.search(text):
matches.append(pattern.pattern)
return matches
def analyze_context(self, text: str) -> Dict[str, any]:
"""Analyze contextual signals in the text."""
return {
"length": len(text),
"word_count": len(text.split()),
"caps_ratio": sum(1 for c in text if c.isupper()) / max(len(text), 1),
"exclamation_count": text.count("!"),
"question_count": text.count("?"),
"url_count": len(re.findall(r"https?://", text)),
"emoji_count": len(re.findall(r"[\U0001F600-\U0001F64F]", text)),
"repeated_chars": bool(re.search(r"(.)\1{4,}", text))
}
def calculate_severity(
self,
toxicity_scores: Dict[str, float],
spam_score: float,
context: Dict
) -> Tuple[float, str]:
"""Calculate overall severity and primary concern."""
severity_factors = {
"severe_toxicity": toxicity_scores.get("severe_toxicity", 0) * 2.0,
"threat": toxicity_scores.get("threat", 0) * 1.8,
"identity_attack": toxicity_scores.get("identity_attack", 0) * 1.5,
"toxicity": toxicity_scores.get("toxicity", 0) * 1.0,
"insult": toxicity_scores.get("insult", 0) * 0.8,
"obscene": toxicity_scores.get("obscene", 0) * 0.7,
"spam": spam_score * 1.2
}
primary = max(severity_factors.items(), key=lambda x: x[1])
severity = min(1.0, sum(severity_factors.values()) / 3)
return severity, primary[0] if primary[1] > 0.3 else None
def determine_action(
self,
toxicity_scores: Dict[str, float],
spam_score: float,
pattern_matches: List[str],
severity: float
) -> ModerationAction:
"""Determine appropriate moderation action."""
if pattern_matches or toxicity_scores.get("severe_toxicity", 0) > 0.7:
return ModerationAction.REMOVE
if severity > 0.8:
return ModerationAction.REMOVE
if severity > 0.5:
return ModerationAction.REVIEW
if severity > 0.3:
return ModerationAction.FLAG
return ModerationAction.ALLOW
def generate_explanation(
self,
action: ModerationAction,
toxicity_scores: Dict[str, float],
spam_score: float,
primary_concern: str
) -> str:
"""Generate human-readable explanation for the decision."""
if action == ModerationAction.ALLOW:
return "Content meets community guidelines."
explanations = {
"severe_toxicity": "Content contains severely toxic language.",
"toxicity": "Content may be perceived as toxic or harmful.",
"threat": "Content contains threatening language.",
"identity_attack": "Content attacks individuals based on identity.",
"insult": "Content contains insulting language.",
"obscene": "Content contains obscene language.",
"spam": "Content appears to be spam or promotional.",
}
base_explanation = explanations.get(
primary_concern,
"Content requires review for policy compliance."
)
if action == ModerationAction.REMOVE:
return f"Auto-removed: {base_explanation}"
elif action == ModerationAction.REVIEW:
return f"Flagged for review: {base_explanation}"
else:
return f"Warning: {base_explanation}"
def moderate(self, text: str) -> ModerationResult:
"""Perform comprehensive content moderation."""
cleaned_text = self.preprocess_text(text)
if not cleaned_text:
return ModerationResult(
action=ModerationAction.ALLOW,
categories={},
primary_concern=None,
confidence=1.0,
explanation="Empty content.",
original_text=text
)
toxicity_scores = self.detect_toxicity(cleaned_text)
spam_score = self.detect_spam(cleaned_text)
pattern_matches = self.check_patterns(cleaned_text)
context = self.analyze_context(cleaned_text)
severity, primary_concern = self.calculate_severity(
toxicity_scores, spam_score, context
)
action = self.determine_action(
toxicity_scores, spam_score, pattern_matches, severity
)
explanation = self.generate_explanation(
action, toxicity_scores, spam_score, primary_concern
)
categories = {**toxicity_scores, "spam": spam_score}
return ModerationResult(
action=action,
categories=categories,
primary_concern=primary_concern,
confidence=severity,
explanation=explanation,
original_text=text
)Step 2: Image Moderation Engine
Now let’s add image moderation capabilities:
# image_moderator.py
import io
import base64
from typing import Dict, Optional, List
from dataclasses import dataclass
from PIL import Image
import torch
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
import requests
import os
@dataclass
class ImageModerationResult:
is_safe: bool
categories: Dict[str, float]
primary_concern: Optional[str]
confidence: float
explanation: str
class ImageModerator:
"""AI-powered image content moderation."""
def __init__(self, device: str = None):
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
# Load NSFW detection model
self.nsfw_extractor = AutoFeatureExtractor.from_pretrained(
"Falconsai/nsfw_image_detection"
)
self.nsfw_model = AutoModelForImageClassification.from_pretrained(
"Falconsai/nsfw_image_detection"
).to(self.device)
# Category thresholds
self.thresholds = {
"nsfw": 0.7,
"violence": 0.6,
"hate_symbols": 0.5
}
def load_image(self, image_source: str) -> Image.Image:
"""Load image from file path, URL, or base64."""
if image_source.startswith("http"):
response = requests.get(image_source, timeout=10)
image = Image.open(io.BytesIO(response.content))
elif image_source.startswith("data:image"):
base64_data = image_source.split(",")[1]
image_data = base64.b64decode(base64_data)
image = Image.open(io.BytesIO(image_data))
else:
image = Image.open(image_source)
return image.convert("RGB")
def detect_nsfw(self, image: Image.Image) -> Dict[str, float]:
"""Detect NSFW content in image."""
inputs = self.nsfw_extractor(images=image, return_tensors="pt")
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.nsfw_model(**inputs)
probs = torch.softmax(outputs.logits, dim=1)
labels = self.nsfw_model.config.id2label
results = {}
for idx, prob in enumerate(probs[0]):
label = labels[idx]
results[label] = float(prob)
return results
def analyze_image_properties(self, image: Image.Image) -> Dict:
"""Analyze basic image properties for additional signals."""
width, height = image.size
return {
"width": width,
"height": height,
"aspect_ratio": width / height,
"is_small": width < 100 or height < 100,
"is_large": width > 4000 or height > 4000,
}
def moderate(self, image_source: str) -> ImageModerationResult:
"""Perform comprehensive image moderation."""
try:
image = self.load_image(image_source)
except Exception as e:
return ImageModerationResult(
is_safe=False,
categories={},
primary_concern="load_error",
confidence=0.0,
explanation=f"Failed to load image: {str(e)}"
)
nsfw_scores = self.detect_nsfw(image)
properties = self.analyze_image_properties(image)
nsfw_score = nsfw_scores.get("nsfw", 0)
is_safe = nsfw_score < self.thresholds["nsfw"]
primary_concern = None
if not is_safe:
primary_concern = max(nsfw_scores.items(), key=lambda x: x[1])[0]
if is_safe:
explanation = "Image content is appropriate."
else:
explanation = f"Image flagged for {primary_concern} content (confidence: {nsfw_score:.1%})."
return ImageModerationResult(
is_safe=is_safe,
categories=nsfw_scores,
primary_concern=primary_concern,
confidence=nsfw_score,
explanation=explanation
)Step 3: Flask API
Create a REST API for the moderation system:
# app.py
import os
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
from text_moderator import TextModerator
from image_moderator import ImageModerator
load_dotenv()
app = Flask(__name__)
CORS(app)
app.config["UPLOAD_FOLDER"] = "uploads"
app.config["MAX_CONTENT_LENGTH"] = 10 * 1024 * 1024 # 10MB
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
# Initialize moderators
text_moderator = TextModerator()
image_moderator = ImageModerator()
@app.route("/")
def index():
return render_template("dashboard.html")
@app.route("/api/moderate/text", methods=["POST"])
def moderate_text():
"""Moderate text content."""
data = request.get_json()
text = data.get("text", "").strip()
if not text:
return jsonify({"error": "No text provided"}), 400
result = text_moderator.moderate(text)
return jsonify({
"action": result.action.value,
"is_safe": result.action.value == "allow",
"confidence": result.confidence,
"summary": result.explanation,
"categories": result.categories
})
@app.route("/api/moderate/image", methods=["POST"])
def moderate_image():
"""Moderate image content."""
if "image" in request.files:
file = request.files["image"]
filename = secure_filename(file.filename)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(filepath)
image_source = filepath
elif request.is_json:
data = request.get_json()
image_source = data.get("image_url", "")
else:
return jsonify({"error": "No image provided"}), 400
result = image_moderator.moderate(image_source)
return jsonify({
"action": "allow" if result.is_safe else "remove",
"is_safe": result.is_safe,
"confidence": result.confidence,
"summary": result.explanation,
"categories": result.categories
})
@app.route("/api/health")
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(debug=True, port=5000)Production Considerations
Rate Limiting
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["1000 per hour", "100 per minute"]
)
@app.route("/api/moderate/text", methods=["POST"])
@limiter.limit("50 per minute")
def moderate_text():
# ... implementationCaching for Repeated Content
import hashlib
from functools import lru_cache
@lru_cache(maxsize=10000)
def cached_moderate(content_hash: str, text: str):
return text_moderator.moderate(text)
def moderate_with_cache(text: str):
content_hash = hashlib.md5(text.encode()).hexdigest()
return cached_moderate(content_hash, text)Testing Your Moderation System
# test_moderation.py
import pytest
from text_moderator import TextModerator, ModerationAction
@pytest.fixture
def moderator():
return TextModerator()
def test_safe_content(moderator):
result = moderator.moderate("Hello, how are you today?")
assert result.action == ModerationAction.ALLOW
def test_toxic_content(moderator):
result = moderator.moderate("You are an idiot and I hate you!")
assert result.action in [ModerationAction.FLAG, ModerationAction.REVIEW]
def test_empty_content(moderator):
result = moderator.moderate("")
assert result.action == ModerationAction.ALLOW
def test_spam_content(moderator):
result = moderator.moderate("Click here to win $1000! DM me now!")
assert result.categories.get("spam", 0) > 0.5Conclusion
You've built a comprehensive AI content moderation system that combines multiple models for accurate detection of harmful content. This system demonstrates production patterns including multi-model ensemble approaches, configurable thresholds, and human review workflows.
Key takeaways:
- Multiple models provide better coverage than single-model approaches
- Confidence scores enable nuanced moderation decisions
- Pattern matching catches obvious violations quickly
- Human review queues are essential for borderline cases
- Regular model updates keep pace with evolving harmful content
This foundation can be extended with features like language detection for multilingual support, user reputation scoring, appeal workflows, and integration with community reporting systems.
