Logo
app.py explanation
Artificial Intelligence

app.py explanation

How app.py is made ?

22 min read
Article
ML
AI
ObjectDetection

Table of Contents

Complete Guide to app.py - Weapon Detection System ๐Ÿ”

This is a complete beginner's guide to understanding every single line of the weapon detection system's main application file. The file contains over 1,600 lines of Python code that creates a sophisticated web API for detecting weapons in images and videos.


1. File Imports and Setup ๐Ÿ“š

import os
import cv2
import numpy as np
from flask import Flask, request, jsonify
from flask_cors import CORS
from ultralytics import YOLO
import smtplib
import time

What this does:

  • os: Operating system interface (file paths, directories)
  • cv2: OpenCV library for computer vision and video processing
  • numpy: Mathematical operations on arrays (images are arrays)
  • flask: Web framework to create the API server
  • flask_cors: Enables Cross-Origin Resource Sharing (allows frontend to connect)
  • ultralytics YOLO: The AI model for object detection
  • smtplib: Sends email alerts
  • time: Time-related functions
try:
    from email.mime.text import MimeText
    from email.mime.multipart import MimeMultipart
except ImportError:
    from email.message import EmailMessage
    MimeText = EmailMessage
    MimeMultipart = EmailMessage

What this does: This is error handling for email libraries. Different Python versions have different email modules, so it tries one and falls back to another if the first doesn't exist.

from werkzeug.utils import secure_filename
import tempfile
from datetime import datetime, timedelta
import config
from models import db, DetectionSession, WeaponDetection, VideoClip, AlertHistory, User
from sqlalchemy import func
from flask_migrate import Migrate
from ollama_service import ollama_service
import uuid

What this does:

  • secure_filename: Makes uploaded filenames safe (removes dangerous characters)
  • tempfile: Creates temporary files for processing
  • datetime: Handles dates and times
  • config: Our custom configuration file
  • models: Our database structure definitions
  • sqlalchemy: Database operations
  • flask_migrate: Database schema updates
  • ollama_service: AI chatbot functionality
  • uuid: Creates unique identifiers

2. Flask Application Configuration โš™๏ธ

app = Flask(__name__)
CORS(app)

What this does:

  • Creates a Flask web application
  • Enables CORS so web browsers can access the API from different domains
app.config['SQLALCHEMY_DATABASE_URI'] = config.DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = config.SQLALCHEMY_TRACK_MODIFICATIONS
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = config.SQLALCHEMY_ENGINE_OPTIONS

What this does: Configures the database connection using settings from the config file.

db.init_app(app)
migrate = Migrate(app, db)

What this does:

  • Connects the database to the Flask app
  • Sets up database migration tools (for updating database structure)
model = None
saved_clips_dir = None

What this does: Creates global variables that will hold the AI model and the directory for saved video clips.


3. Initialization Functions ๐Ÿš€

initialize_app() Function

def initialize_app():
    """Initialize the Flask application with model and directories."""
    global model, saved_clips_dir

What this does: This function sets up everything the application needs to run. The global keyword means it can modify variables defined outside the function.

try:
    model_path = config.MODEL_PATH
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found at {model_path}")

What this does:

  • Gets the path to the AI model file from config
  • Checks if the file actually exists
  • Raises an error if the model file is missing
import torch

from ultralytics.nn.tasks import DetectionModel
from ultralytics.nn.modules.head import Detect
from ultralytics.nn.modules.conv import Conv
from ultralytics.nn.modules.block import C2f, SPPF

original_load = torch.load

def patched_load(*args, **kwargs):
    if 'weights_only' not in kwargs:
        kwargs['weights_only'] = False
    return original_load(*args, **kwargs)

torch.load = patched_load        
model = YOLO(model_path)

torch.load = original_load

What this does: This is a technical workaround for loading the AI model:

  1. Imports PyTorch (the deep learning framework)
  2. Imports specific YOLO model components
  3. Temporarily modifies how PyTorch loads files to avoid security warnings
  4. Loads the YOLO model
  5. Restores the original loading function
saved_clips_dir = config.SAVED_CLIPS_FOLDER
os.makedirs(saved_clips_dir, exist_ok=True)
return True

What this does:

  • Sets up the directory where video clips will be saved
  • Creates the directory if it doesn't exist
  • Returns True to indicate successful initialization

create_database_tables() Function

def create_database_tables():
    """Create all database tables."""
    try:
        with app.app_context():
            db.create_all()

What this does:

  • Creates all database tables defined in our models
  • Uses Flask's application context (required for database operations)
if not User.query.first():
    from werkzeug.security import generate_password_hash
    admin_user = User(
        username='admin',
        email='admin@weapondetection.com',
        password_hash=generate_password_hash('admin123'),
        role='admin'
    )
    db.session.add(admin_user)
    db.session.commit()

What this does:

  • Checks if any users exist in the database
  • If no users exist, creates a default admin user
  • Hashes the password for security
  • Saves the user to the database

4. Utility Functions ๐Ÿ› ๏ธ

File Validation Functions

def is_allowed_file(filename):
    """Check if the uploaded file has an allowed extension."""
    if not filename:
        return False
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in config.ALLOWED_EXTENSIONS

What this does:

  • Takes a filename as input
  • Checks if it has a file extension (contains a dot)
  • Extracts the extension (part after the last dot)
  • Converts to lowercase and checks if it's in the allowed list
  • Returns True if allowed, False if not
def is_video_file(filename):
    """Check if the file is a video file."""
    if not filename:
        return False
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in config.VIDEO_EXTENSIONS

What this does: Similar to is_allowed_file() but specifically checks for video file extensions.


5. Email Alert System ๐Ÿ“ง

send_email_alert() Function

def send_email_alert(detections, saved_clips=None):
    """Send email alert when weapons are detected."""
    try:
        if not config.EMAIL_ENABLED:
            print("๐Ÿ“ง Email alerts disabled in configuration")
            return

What this does:

  • Function to send email when weapons are detected
  • First checks if email alerts are enabled in configuration
  • If disabled, prints a message and exits
try:
    msg = MimeMultipart()
    msg['From'] = config.SMTP_USERNAME
    msg['To'] = config.ALERT_EMAIL
    msg['Subject'] = "WEAPON DETECTION ALERT"

What this does:

  • Creates an email message object
  • Sets the sender, recipient, and subject
  • Uses SMTP settings from configuration
body = "SECURITY ALERT: Weapons detected in uploaded media\n\n"
body += "Detection Details:\n"
body += "-" * 40 + "\n"

for detection in detections:
    confidence_percent = detection['confidence'] * 100
    body += f"โ€ข {detection['name']}: {confidence_percent:.1f}% confidence\n"

What this does:

  • Creates the email body text
  • Adds a header and separator line
  • Loops through each detected weapon
  • Converts confidence from decimal (0.85) to percentage (85.0%)
  • Adds each detection to the email body
if saved_clips:
    body += "\nSaved Video Clips:\n"
    body += "-" * 40 + "\n"
    for clip in saved_clips:
        body += f"โ€ข File: {clip['filename']}\n"
        body += f"  Duration: {clip['duration']:.1f}s\n"
        body += f"  Confidence: {clip['confidence']:.1f}%\n\n"

What this does:

  • If video clips were saved, adds information about them to the email
  • Shows filename, duration, and confidence level for each clip
body += f"\nTimestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
body += "Please take immediate action if required."

msg.attach(MimeText(body, 'plain'))
email_text = msg.as_string()

What this does:

  • Adds current timestamp to the email
  • Adds a call-to-action message
  • Attaches the text to the email message
  • Converts the message to string format
with smtplib.SMTP(config.SMTP_SERVER, config.SMTP_PORT) as server:
    server.starttls()
    server.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
    server.sendmail(config.SMTP_USERNAME, config.ALERT_EMAIL, email_text)

print("Email alert sent successfully")

What this does:

  • Connects to the email server
  • Starts TLS encryption for security
  • Logs in with username and password
  • Sends the email
  • Prints success message

6. Weapon Detection Functions ๐ŸŽฏ

detect_weapons_in_image() Function

def detect_weapons_in_image(image_path):
    """Detect weapons in a single image."""
    try:
        results = model(image_path)
        detections = []

What this does:

  • Takes an image file path as input
  • Runs the YOLO model on the image
  • Creates an empty list to store detected weapons
for r in results:
    boxes = r.boxes
    if boxes is not None:
        for box in boxes:
            class_id = int(box.cls)
            class_name = model.names[class_id]
            confidence = float(box.conf)

What this does:

  • Loops through all detection results
  • Gets the bounding boxes (rectangles around detected objects)
  • For each box, extracts:
    • class_id: Numeric ID of the detected object type
    • class_name: Human-readable name (e.g., "knife", "gun")
    • confidence: How sure the AI is (0.0 to 1.0)
print(f"Detected: {class_name} with confidence: {confidence:.3f}")

if class_name in config.DETECTION_CLASSES and confidence >= config.CONFIDENCE_THRESHOLD:
    detections.append({
        'name': class_name,
        'confidence': confidence
    })
    print(f"WEAPON ALERT: {class_name} - {confidence*100:.1f}%")

What this does:

  • Prints what was detected
  • Checks if the detected object is a weapon we care about
  • Checks if confidence is high enough (above threshold)
  • If both conditions are met, adds it to the detections list
  • Prints an alert message

7. Video Processing ๐ŸŽฌ

detect_weapons_in_video() Function

This is the most complex function in the file. It processes video frame by frame to detect weapons.

def detect_weapons_in_video(video_path):
    """Detect weapons in video with immediate alerts for lethal weapons."""
    start_time = time.time()
    
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError("Could not open video file")

What this does:

  • Records start time for performance measurement
  • Opens the video file using OpenCV
  • Checks if the video opened successfully
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"๐Ÿ“น Processing video: {fps:.1f} FPS, {total_frames} frames")

What this does:

  • Gets the video's frames per second (FPS)
  • Gets the total number of frames in the video
  • Prints video information
detections = []
weapon_detected = {}
saved_clips = []
frame_count = 0
alert_sent = False 

continuous_detections = {}

What this does: Creates variables to track:

  • detections: List of all weapons found
  • weapon_detected: Dictionary of unique weapons and their details
  • saved_clips: List of video clips that were saved
  • frame_count: Current frame number
  • alert_sent: Whether an email alert was already sent
  • continuous_detections: Tracks weapons seen across multiple frames
def get_frame_skip_rate(total_frames):
    """Calculate optimal frame skip rate based on video length."""
    if total_frames < 300: 
        return 5
    elif total_frames < 900:  
        return 10
    else:  
        return 15

What this does: This inner function optimizes processing speed:

  • For short videos (< 300 frames): check every 5th frame
  • For medium videos (< 900 frames): check every 10th frame
  • For long videos: check every 15th frame
  • This makes processing faster while still catching weapons
while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    
    if frame_count % frame_skip_rate != 0:
        continue

What this does:

  • Main processing loop that reads each frame
  • ret is True if frame was read successfully
  • If no more frames, exit the loop
  • Skip frames based on the calculated skip rate
current_time = frame_count / fps
results = model(frame)
current_detections = set()

What this does:

  • Calculates the current time in the video (frame number รท FPS)
  • Runs the AI model on the current frame
  • Creates a set to track what's detected in this frame
for r in results:
    boxes = r.boxes
    if boxes is not None:
        for box in boxes:
            class_id = int(box.cls)
            class_name = model.names[class_id]
            confidence = float(box.conf)
            
            print(f"Frame {frame_count}: {class_name} - {confidence:.3f}")

What this does: Similar to image detection, but for each frame:

  • Extracts detection information
  • Prints what was found in this frame
if class_name in config.DETECTION_CLASSES:
    print(f"Found weapon class: {class_name} with confidence {confidence:.3f}")
    print(f"Confidence threshold: {config.CONFIDENCE_THRESHOLD}")
    
    if confidence >= config.CONFIDENCE_THRESHOLD:
        current_detections.add(class_name)

What this does:

  • Checks if detected object is a weapon type we care about
  • Prints debug information
  • If confidence is high enough, adds to current frame's detections
if class_name not in weapon_detected or confidence > weapon_detected[class_name]['confidence']:
    weapon_detected[class_name] = {
        'name': class_name,
        'confidence': confidence,
        'first_detected_time': current_time,
        'frame': frame_count
    }
    
    print(f"WEAPON DETECTED: {class_name} - {confidence*100:.1f}%")

What this does:

  • If this is a new weapon OR higher confidence than before:
    • Records the weapon details
    • Saves the time it was first detected
    • Prints an alert
if not alert_sent:
    print(f"Sending immediate alert for {class_name} detection!")
    immediate_detections = [{
        'name': class_name,
        'confidence': confidence
    }]
    try:
        send_email_alert(immediate_detections)
        alert_sent = True
    except Exception as email_error:
        print(f"Email alert failed but continuing detection: {email_error}")
        alert_sent = True

What this does:

  • If no email alert has been sent yet:
    • Sends an immediate email alert
    • Marks that alert has been sent
    • If email fails, continues processing anyway

Video Clip Saving Logic

if confidence >= config.CONTINUOUS_DETECTION_THRESHOLD:
    if class_name not in continuous_detections:
        continuous_detections[class_name] = {
            'start_time': current_time,
            'start_frame': frame_count,
            'max_confidence': confidence,
            'frames': []
        }

What this does:

  • If confidence is high enough for clip saving:
    • Starts tracking this weapon continuously
    • Records when tracking started
    • Initializes tracking data
continuous_detections[class_name]['max_confidence'] = max(
    continuous_detections[class_name]['max_confidence'],
    confidence
)
continuous_detections[class_name]['frames'].append({
    'frame': frame_count,
    'time': current_time,
    'confidence': confidence
})

What this does:

  • Updates the maximum confidence seen for this weapon
  • Records this frame's detection data

Clip Saving Decision Logic

weapons_to_remove = []
for weapon_name, track_info in continuous_detections.items():
    detection_duration = current_time - track_info['start_time']
    
    if weapon_name not in current_detections:
        if detection_duration >= config.CONTINUOUS_DETECTION_DURATION:
            clip_filename = save_video_clip(
                video_path, 
                track_info['start_time'], 
                current_time,
                weapon_name,
                track_info['max_confidence']
            )

What this does:

  • For each weapon being tracked:
    • Calculates how long it's been detected
    • If weapon is no longer visible in current frame:
      • If it was visible long enough, saves a video clip
      • Calls the save_video_clip() function

save_video_clip() Function

def save_video_clip(video_path, start_time, end_time, weapon_name, confidence):
    """Save a video clip of the detected weapon."""
    try:
        buffer_time = 2.0
        clip_start = max(0, start_time - buffer_time)
        clip_end = end_time + buffer_time

What this does:

  • Adds 2 seconds before and after the detection
  • Ensures start time isn't negative
  • This gives context around the detection
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
confidence_str = f"{confidence*100:.0f}"
clip_filename = f"weapon_detected_{weapon_name.lower()}_{timestamp}_conf{confidence_str}%.mp4"
clip_path = os.path.join(saved_clips_dir, clip_filename)

What this does:

  • Creates a unique filename with:
    • Current date and time
    • Weapon name
    • Confidence percentage
  • Combines with the clips directory path
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)

start_frame = int(clip_start * fps)
end_frame = int(clip_end * fps)

cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

What this does:

  • Opens the original video file
  • Converts time to frame numbers
  • Positions the video at the start frame
ret, frame = cap.read()
if not ret:
    cap.release()
    return None
    
height, width, _ = frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(clip_path, fourcc, fps, (width, height))

What this does:

  • Reads a frame to get video dimensions
  • Sets up video encoding format (mp4v)
  • Creates a video writer object
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

frame_count = start_frame
while frame_count <= end_frame:
    ret, frame = cap.read()
    if not ret:
        break
    out.write(frame)
    frame_count += 1

cap.release()
out.release()

What this does:

  • Goes back to start frame
  • Reads each frame from start to end
  • Writes each frame to the new clip file
  • Closes both video files

8. Web API Endpoints ๐ŸŒ

The application provides many web endpoints that external applications (like websites) can call.

Home Endpoint

@app.route('/', methods=['GET'])
def home():
    """API information endpoint."""
    return jsonify({
        'message': 'Weapon Detection API',
        'version': '2.0',
        'endpoints': {
            'detect': 'POST /detect - Upload image/video for weapon detection',
            'health': 'GET /health - Check API health',
            'clips': 'GET /clips - List saved video clips'
        },
        'supported_formats': {
            'images': list(config.IMAGE_EXTENSIONS),
            'videos': list(config.VIDEO_EXTENSIONS)
        },
        'detection_classes': config.DETECTION_CLASSES
    })

What this does:

  • @app.route('/', methods=['GET']): When someone visits the main URL with GET request
  • Returns JSON information about the API
  • Lists available endpoints
  • Shows supported file formats
  • Shows what types of weapons can be detected

Health Check Endpoint

@app.route('/health', methods=['GET'])
@app.route('/status', methods=['GET'])
def health_check():
    """Enhanced health check endpoint matching frontend requirements."""
    try:
        # Test database connection
        db.session.execute(db.text('SELECT 1'))
        
        return jsonify({
            'status': 'ok',
            'timestamp': datetime.now().isoformat() + 'Z',
            'ai_features': config.OLLAMA_ENABLED,
            'database': 'connected',
            'email_alerts': config.EMAIL_ENABLED,
            'model': config.MODEL_PATH
        })
    except Exception as e:
        return jsonify({
            'status': 'error',
            'timestamp': datetime.now().isoformat() + 'Z',
            'error': str(e)
        }), 500

What this does:

  • Responds to both /health and /status URLs
  • Tests if database is working by running a simple query
  • Returns system status information:
    • Overall status (ok/error)
    • Current timestamp
    • Whether AI features are enabled
    • Database connection status
    • Whether email alerts are enabled
    • Model file path
  • If anything fails, returns error status with 500 code

Main Detection Endpoint

@app.route('/detect', methods=['POST'])
@app.route('/upload', methods=['POST'])
def detect_weapons():
    """Main weapon detection endpoint with comprehensive error handling."""
    session_id = str(uuid.uuid4())
    start_time = time.time()
    
    try:
        print(f"Detection request received (Session: {session_id})")

What this does:

  • Responds to both /detect and /upload URLs
  • Only accepts POST requests (for file uploads)
  • Creates a unique session ID for this detection
  • Records start time for performance tracking
if 'file' not in request.files:
    return jsonify({
        'error': True,
        'message': 'No file uploaded',
        'session_id': session_id
    }), 400

What this does:

  • Checks if a file was actually uploaded
  • Returns error if no file found
  • HTTP 400 = "Bad Request" (client error)
file = request.files['file']
if file.filename == '':
    return jsonify({
        'error': True,
        'message': 'No file selected',
        'session_id': session_id
    }), 400

What this does:

  • Gets the uploaded file
  • Checks if filename is empty
  • Returns error if no filename
if not is_allowed_file(file.filename):
    return jsonify({
        'error': True,
        'message': f'File type not supported. Allowed: {", ".join(config.ALLOWED_EXTENSIONS)}',
        'session_id': session_id
    }), 400

What this does:

  • Checks if file type is allowed
  • If not, returns list of allowed file types in error message
filename = secure_filename(file.filename)
file_extension = filename.rsplit('.', 1)[1].lower()
file_size = request.content_length or 0

# Create detection session in database
session = DetectionSession(
    session_id=session_id,
    filename=filename,
    file_type=file_extension,
    file_size=file_size,
    upload_timestamp=datetime.utcnow(),
    status='processing',
    ip_address=request.remote_addr,
    user_agent=request.headers.get('User-Agent', 'Unknown')
)
db.session.add(session)
db.session.commit()

What this does:

  • Makes filename safe (removes dangerous characters)
  • Extracts file extension
  • Gets file size
  • Creates a database record for this detection session
  • Records client information (IP address, browser)
  • Saves to database immediately
with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as temp_file:
    file.save(temp_file.name)
    temp_filepath = temp_file.name

print(f"File saved temporarily: {temp_filepath}")

What this does:

  • Creates a temporary file on the server
  • Saves the uploaded file to this temporary location
  • Keeps track of the temporary file path

Detection Processing

if is_video_file(filename):
    print(f"Processing video file: {filename}")
    detections, saved_clips = detect_weapons_in_video(temp_filepath)
    processing_type = 'video'
else:
    print(f"Processing image file: {filename}")
    detections = detect_weapons_in_image(temp_filepath)
    saved_clips = []
    processing_type = 'image'

What this does:

  • Determines if uploaded file is video or image
  • Calls appropriate detection function
  • For images, no clips can be saved (empty list)
processing_time = time.time() - start_time

# Update session with results
session.processing_time = processing_time
session.status = 'completed'
db.session.commit()

print(f"Processing completed in {processing_time:.2f} seconds")

What this does:

  • Calculates total processing time
  • Updates database record with results
  • Marks session as completed

Saving Detection Results

detection_records = []
for detection in detections:
    weapon_detection = WeaponDetection(
        session_id=session_id,
        weapon_type=detection['name'],
        confidence=detection['confidence'],
        first_detected_time=detection.get('first_detected_time'),
        frame_number=detection.get('frame'),
        detection_timestamp=datetime.utcnow()
    )
    db.session.add(weapon_detection)
    detection_records.append(weapon_detection)
    print(f"Detection logged to database: {session_id}")

What this does:

  • For each detected weapon:
    • Creates a database record
    • Saves all the detection details
    • Adds to database session
    • Keeps track of created records

Email Alert Logic

if detections:
    try:
        send_email_alert(detections, saved_clips)
        
        alert_record = AlertHistory(
            session_id=session_id,
            alert_type='weapon_detection',
            recipient_email=config.ALERT_EMAIL,
            subject='WEAPON DETECTION ALERT',
            body=f'Weapons detected: {[d["name"] for d in detections]}',
            sent_at=datetime.utcnow(),
            delivery_status='sent'
        )
        db.session.add(alert_record)
        
    except Exception as e:
        print(f"Email sending failed: {e}")
        
        alert_record = AlertHistory(
            session_id=session_id,
            alert_type='weapon_detection',
            recipient_email=config.ALERT_EMAIL,
            subject='WEAPON DETECTION ALERT',
            body=f'Weapons detected: {[d["name"] for d in detections]}',
            sent_at=datetime.utcnow(),
            delivery_status='failed',
            error_message=str(e)
        )
        db.session.add(alert_record)
        
    print(f"Email alert logged to database for session: {session_id}")

What this does:

  • If weapons were detected:
    • Tries to send email alert
    • Creates database record of email attempt
    • If email succeeds: marks as 'sent'
    • If email fails: marks as 'failed' and records error
    • Always logs the attempt

Response Generation

response_data = {
    'session_id': session_id,
    'processing_time': round(processing_time, 2),
    'file_info': {
        'filename': filename,
        'size': file_size,
        'type': processing_type
    },
    'detections': {
        'count': len(detections),
        'weapons': []
    }
}

for detection in detections:
    weapon_info = {
        'name': detection['name'],
        'confidence': round(detection['confidence'], 4),
        'confidence_percent': round(detection['confidence'] * 100, 1)
    }
    
    if 'first_detected_time' in detection:
        weapon_info['first_detected_time'] = detection['first_detected_time']
    if 'frame' in detection:
        weapon_info['frame_number'] = detection['frame']
        
    response_data['detections']['weapons'].append(weapon_info)

What this does:

  • Creates a structured response with all detection information
  • Includes session details, processing time, file info
  • For each detection, includes name, confidence, and timing info
  • Rounds numbers for cleaner display
if saved_clips:
    response_data['clips'] = {
        'count': len(saved_clips),
        'files': []
    }
    
    for clip in saved_clips:
        clip_info = {
            'filename': clip['filename'],
            'weapon': clip['weapon'],
            'confidence': round(clip['confidence'], 4),
            'duration': round(clip['duration'], 1),
            'start_time': round(clip['start_time'], 1)
        }
        response_data['clips']['files'].append(clip_info)

What this does:

  • If video clips were saved, adds clip information to response
  • Includes filename, weapon type, confidence, duration, and start time

Cleanup and Final Response

try:
    os.unlink(temp_filepath)
    print(f"Temporary file cleaned up: {temp_filepath}")
except Exception as cleanup_error:
    print(f"Warning: Could not clean up temporary file: {cleanup_error}")

db.session.commit()

print(f"Detection complete: {len(detections)} weapons found (Session: {session_id})")

return jsonify(response_data)

What this does:

  • Deletes the temporary file (cleanup)
  • If cleanup fails, just warns (doesn't crash)
  • Commits all database changes
  • Prints summary
  • Returns the complete response as JSON

9. Database Operations ๐Ÿ’พ

Stats and Dashboard Endpoints

@app.route('/api/stats/dashboard', methods=['GET'])
@app.route('/stats/dashboard', methods=['GET']) 
@app.route('/dashboard/stats', methods=['GET'])
def dashboard_stats():
    """Get dashboard statistics."""
    try:
        # Get recent data (last 30 days)
        start_date = datetime.utcnow() - timedelta(days=30)
        
        # Count total sessions and detections
        total_sessions = DetectionSession.query.filter(
            DetectionSession.upload_timestamp >= start_date
        ).count()
        
        total_detections = WeaponDetection.query.join(DetectionSession).filter(
            DetectionSession.upload_timestamp >= start_date
        ).count()

What this does:

  • Responds to multiple URL patterns (different ways frontends might call it)
  • Calculates statistics for the last 30 days
  • Counts total detection sessions and weapon detections
  • Uses database joins to connect related tables
# Today's stats
today = datetime.utcnow().date()
today_sessions = DetectionSession.query.filter(
    db.func.date(DetectionSession.upload_timestamp) == today
).count()

recent_detections = WeaponDetection.query.join(DetectionSession).filter(
    DetectionSession.upload_timestamp >= start_date
).all()

What this does:

  • Gets today's date
  • Counts how many sessions happened today
  • Gets all recent weapon detections for analysis
# Calculate average confidence
if recent_detections:
    avg_confidence = sum(float(d.confidence) for d in recent_detections) / len(recent_detections)
else:
    avg_confidence = 0

# Calculate average processing time
sessions_with_time = DetectionSession.query.filter(
    DetectionSession.upload_timestamp >= start_date,
    DetectionSession.processing_time.isnot(None)
).all()

if sessions_with_time:
    avg_processing_time = sum(float(s.processing_time) for s in sessions_with_time) / len(sessions_with_time)
else:
    avg_processing_time = 0

What this does:

  • Calculates average confidence of all detections
  • Finds sessions that have processing time recorded
  • Calculates average processing time

Recent Activity Tracking

# Recent activity (last 10 sessions)
recent_sessions = DetectionSession.query.order_by(
    DetectionSession.upload_timestamp.desc()
).limit(10).all()

recent_activity = []
for session in recent_sessions:
    activity = {
        'session_id': session.session_id,
        'timestamp': session.upload_timestamp.isoformat() + 'Z',
        'filename': session.filename,
        'detections_count': session.detections.count(),
        'status': session.status
    }
    recent_activity.append(activity)

What this does:

  • Gets the 10 most recent detection sessions
  • Orders them by timestamp (newest first)
  • For each session, creates a summary with:
    • Session ID
    • When it happened
    • Original filename
    • Number of weapons detected
    • Processing status

Response Assembly

return jsonify({
    'total_sessions': total_sessions,
    'total_detections': total_detections,
    'threats_detected': total_detections,  # Alias for compatibility
    'false_positives': 2,  # Static value - could be made dynamic
    'avg_confidence': round(avg_confidence, 2),
    'avg_processing_time': round(avg_processing_time),
    'scans_today': today_sessions,
    'threats_today': today_sessions,  # Simplified - assumes all scans find threats
    'recent_activity': recent_activity,
    'status': 'success',
    'timestamp': datetime.now().isoformat()
})

What this does:

  • Creates a comprehensive response with all statistics
  • Rounds numbers for display
  • Includes both current names and aliases for compatibility
  • Adds status and timestamp for debugging

10. AI Integration ๐Ÿค–

AI Status Endpoint

@app.route('/api/ai/status', methods=['GET'])
@app.route('/ai/status', methods=['GET'])
@app.route('/status/ai', methods=['GET'])
def ai_status_extended():
    """Get detailed AI service status."""
    try:
        return jsonify({
            'available': config.OLLAMA_ENABLED and ollama_service.is_ollama_available(),
            'model': 'SecureVision-AI-v2.1',
            'version': '2.1.0',
            'last_updated': datetime.now().isoformat() + 'Z',
            'health_score': 0.98,
            'ollama_enabled': config.OLLAMA_ENABLED,
            'model_ready': ollama_service.ensure_model_available() if config.OLLAMA_ENABLED else False
        })
    except Exception as e:
        return jsonify({
            'available': False,
            'error': str(e),
            'timestamp': datetime.now().isoformat()
        }), 500

What this does:

  • Checks if AI chatbot features are available
  • Tests if Ollama service is running
  • Verifies the AI model is loaded
  • Returns detailed status information
  • If anything fails, returns error status

AI Insights Endpoint

@app.route('/api/ai/insights', methods=['GET'])
@app.route('/ai/insights', methods=['GET'])
@app.route('/insights', methods=['GET'])
def ai_insights():
    """Get AI-powered security insights."""
    try:
        if not config.OLLAMA_ENABLED:
            return jsonify({
                'insights': [],
                'message': 'AI insights disabled in configuration'
            })
        
        insights = ollama_service.get_security_insights()
        
        return jsonify({
            'insights': insights.get('insights', []),
            'summary': insights.get('summary', ''),
            'recommendations': insights.get('recommendations', []),
            'risk_assessment': insights.get('risk_assessment', {}),
            'timestamp': datetime.now().isoformat()
        })
        
    except Exception as e:
        return jsonify({
            'insights': [],
            'error': str(e)
        }), 500

What this does:

  • Checks if AI features are enabled
  • Calls the AI service to analyze detection patterns
  • Returns security insights, recommendations, and risk assessments
  • Handles errors gracefully

AI Explanations Endpoint

@app.route('/api/ai/explain/<session_id>', methods=['GET'])
@app.route('/ai/explain/<session_id>', methods=['GET'])
@app.route('/explain/<session_id>', methods=['GET'])
def ai_explain(session_id):
    """Get AI explanation for a detection session."""
    try:
        if not config.OLLAMA_ENABLED:
            return jsonify({
                'error': 'AI explanation feature is disabled'
            }), 503
        
        # Handle timestamp-based session IDs
        if 'T' in session_id and ('Z' in session_id or '+' in session_id):
            try:
                timestamp = datetime.fromisoformat(session_id.replace('Z', '+00:00'))
                session = DetectionSession.query.filter(
                    DetectionSession.upload_timestamp >= timestamp - timedelta(seconds=30),
                    DetectionSession.upload_timestamp <= timestamp + timedelta(seconds=30)
                ).first()
            except ValueError:
                return jsonify({'error': 'Invalid timestamp format'}), 400
        else:
            # Handle regular session IDs
            session = DetectionSession.query.filter_by(session_id=session_id).first()

What this does:

  • Handles two types of session identifiers:
    • Regular session IDs (UUIDs)
    • Timestamp-based IDs (for frontend compatibility)
  • For timestamps, finds sessions within 30 seconds of that time
  • For regular IDs, looks up directly in database
if not session:
    return jsonify({'error': 'Session not found'}), 404

detections = WeaponDetection.query.filter_by(session_id=session.session_id).all()

if not detections:
    return jsonify({
        'session_id': session_id,
        'explanation': 'No weapons detected in this session.',
        'confidence': 0,
        'risk_level': 'low'
    })

What this does:

  • Checks if session exists
  • Gets all weapon detections for that session
  • If no weapons found, returns appropriate response
explanation_data = ollama_service.explain_detection(session.session_id)

return jsonify({
    'session_id': session_id,
    'explanation': explanation_data.get('explanation', 'Analysis not available'),
    'confidence': explanation_data.get('confidence', 0),
    'risk_level': explanation_data.get('risk_level', 'unknown'),
    'factors': explanation_data.get('factors', []),
    'recommendations': explanation_data.get('recommendations', []),
    'technical_details': explanation_data.get('technical_details', {})
})

What this does:

  • Calls AI service to generate explanation
  • Returns comprehensive analysis including:
    • Human-readable explanation
    • Confidence assessment
    • Risk level
    • Contributing factors
    • Security recommendations
    • Technical details

11. Main Application Startup ๐Ÿš€

Application Initialization

if __name__ == '__main__':
    print("Starting Weapon Detection API with Database...")
    
    # Initialize the application
    if not initialize_app():
        print("Failed to initialize application. Exiting...")
        exit(1)
    
    # Create database tables
    if not create_database_tables():
        print("Failed to create database tables. Exiting...")
        exit(1)

What this does:

  • if __name__ == '__main__': means this code only runs when the file is executed directly
  • Calls initialization functions we defined earlier
  • If either initialization fails, exits the program with error code 1

AI Service Verification

# Check Ollama availability
if config.OLLAMA_ENABLED:
    if not ollama_service.is_ollama_available():
        print("Warning: Ollama service not available. AI features will be disabled.")
    elif not ollama_service.ensure_model_available():
        print(f"Warning: Model {config.OLLAMA_MODEL} not available. AI features may not work.")

What this does:

  • If AI features are enabled in config:
    • Checks if Ollama service is running
    • Verifies the AI model is available
    • Prints warnings if problems found (but doesn't crash)

Server Startup

print(f"Starting server on http://{config.HOST}:{config.PORT}")

try:
    app.run(
        host=config.HOST,
        port=config.PORT,
        debug=config.DEBUG,
        threaded=True
    )
except KeyboardInterrupt:
    print("\nServer stopped by user")
except Exception as e:
    print(f"Server error: {e}")
    exit(1)

What this does:

  • Prints the server URL
  • Starts the Flask web server with configuration from config file
  • threaded=True allows handling multiple requests simultaneously
  • Handles two types of shutdown:
    • KeyboardInterrupt: User presses Ctrl+C
    • Other exceptions: Unexpected errors
  • Exits with error code if server crashes

Key Concepts for Beginners ๐Ÿ“š

What is Flask?

Flask is a web framework that lets you create websites and APIs. When you visit a URL like /health, Flask calls the corresponding function and returns the result.

What is JSON?

JSON (JavaScript Object Notation) is a way to structure data that both humans and computers can read. It looks like:

{
  "name": "knife",
  "confidence": 0.85
}

What is a Database Session?

Not to be confused with detection sessions! A database session is a connection to the database that lets you make multiple changes and then save them all at once with commit().

What is Error Handling?

The try/except blocks catch errors so the program doesn't crash. Instead, it can return a helpful error message.

What is YOLO?

YOLO (You Only Look Once) is an AI model that can detect objects in images very quickly. It returns bounding boxes around detected objects with confidence scores.

What is OpenCV?

OpenCV is a library for computer vision tasks like reading videos, processing images, and saving video clips.


Summary

This application is a sophisticated weapon detection system that:

  1. Accepts file uploads through a web API
  2. Processes images and videos using AI to detect weapons
  3. Stores results in a database for tracking
  4. Sends email alerts when weapons are detected
  5. Saves video clips of weapon detections
  6. Provides statistics and insights through various endpoints
  7. Integrates with AI chatbot for explanations and insights
  8. Handles errors gracefully to prevent crashes
  9. Supports multiple frontends through flexible endpoint aliases

The code is designed to be robust, scalable, and maintainable, with comprehensive error handling and logging throughout.

Published on June 23, 2025

Estimated reading time: 22 minutes

Share this article: