#!/usr/bin/env python3 import cv2 import torch import yt_dlp import subprocess import time import os import numpy as np from datetime import datetime from flask import Flask, Response import warnings # Warning! saves objects in objectlist.txt to file /detections/$object warnings.simplefilter("ignore", category=FutureWarning) #ignores torch warning app = Flask(__name__) YOUTUBE_URL = "https://www.youtube.com/watch?v=t45_gP7I82I" # Stream URL CONFIDENCE_THRESHOLD = 0.4 # Confidence threshold MODEL = "yolov5s" # YOLO model version # Load YOLOv5 model everytime, this keeps it updated, make force_reload = False if not. print("🔄 Loading YOLOv5 model...") model = torch.hub.load("ultralytics/yolov5", "custom", path=MODEL, force_reload=True) print("✅ YOLOv5 loaded successfully!") # Load object list from file OBJECT_LIST_FILE = "objectlist.txt" if os.path.exists(OBJECT_LIST_FILE): with open(OBJECT_LIST_FILE, "r") as f: OBJECT_NAMES = set(line.strip().lower() for line in f if line.strip()) print(f"📜 Loaded {len(OBJECT_NAMES)} objects from {OBJECT_LIST_FILE}") else: print(f"⚠️ {OBJECT_LIST_FILE} not found! No filtering will be applied.") OBJECT_NAMES = set() # Empty set (detects all objects) # Ensure directories exist os.makedirs("detections", exist_ok=True) def get_stream_url(): """Fetch 720p YouTube stream URL using yt-dlp.""" ydl_opts = {'quiet': True, 'format': 'bestvideo[height=720]'} with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(YOUTUBE_URL, download=False) return info_dict.get("url", None) def save_cropped_object(frame, bbox, label): """Save cropped object with 50px padding in its respective folder.""" x1, y1, x2, y2 = bbox h, w, _ = frame.shape # Add 50px padding while ensuring within image bounds x1, y1 = max(0, x1 - 50), max(0, y1 - 50) x2, y2 = min(w, x2 + 50), min(h, y2 + 50) cropped = frame[y1:y2, x1:x2] # Crop the bounding box # Create folder for the object if it doesn't exist save_dir = os.path.join("detections", label) os.makedirs(save_dir, exist_ok=True) # Save with timestamp filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg" cv2.imwrite(os.path.join(save_dir, filename), cropped) print(f"📸 Saved: {os.path.join(save_dir, filename)}") def generate_frames(): """Capture video frames, apply object detection, and stream as MJPEG.""" stream_url = get_stream_url() if not stream_url: print("❌ Failed to fetch stream URL!") return print("🎥 Starting FFmpeg stream...") ffmpeg_process = subprocess.Popen([ "ffmpeg", "-re", "-i", stream_url, "-f", "rawvideo", "-pix_fmt", "bgr24", "pipe:1" ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8) frame_width, frame_height = 1280, 720 # Expecting 720p stream while True: raw_frame = ffmpeg_process.stdout.read(frame_width * frame_height * 3) # Read raw BGR frame if not raw_frame: print("❌ No frame received!") break frame = np.frombuffer(raw_frame, np.uint8).reshape((frame_height, frame_width, 3)) # Convert to NumPy array # Run YOLO object detection results = model(frame) detections = results.pandas().xyxy[0] # Convert detections to Pandas DataFrame if detections.empty: print("No objects detected.") else: print(f"✅ Detected {len(detections)} objects!") for _, row in detections.iterrows(): label = row["name"].lower() confidence = row["confidence"] if confidence > CONFIDENCE_THRESHOLD and (not OBJECT_NAMES or label in OBJECT_NAMES): x1, y1, x2, y2 = int(row["xmin"]), int(row["ymin"]), int(row["xmax"]), int(row["ymax"]) save_cropped_object(frame, (x1, y1, x2, y2), label) # Draw bounding boxes cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Green box cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) # Encode and present the frame as JPEG _, buffer = cv2.imencode('.jpg', frame) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') @app.route('/video') def video_feed(): """Stream processed video frames.""" return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == '__main__': print("Running at http://localhost:5000/video") app.run(host='0.0.0.0', port=5000, debug=True, threaded=True)