video_objects/object_detection/yolo5blacks.py
2025-04-02 08:32:58 -04:00

111 lines
4 KiB
Python
Executable file

#!/usr/bin/env python3
import cv2
import torch
import yt_dlp
import subprocess
import time
import os
import numpy as np
from flask import Flask, Response
import warnings
warnings.simplefilter("ignore", category=FutureWarning)
app = Flask(__name__)
YOUTUBE_URL = "https://www.youtube.com/watch?v=t45_gP7I82I" # stream URL
CONFIDENCE_THRESHOLD = 0.3 # Confidence threshold for object detection
MODEL = "yolov5s" # YOLO model version (yolov5s, yolov5m, etc.)
# Load YOLOv5 model
print("🔄 Loading YOLOv5 model...")
model = torch.hub.load("ultralytics/yolov5", "custom", path=MODEL, force_reload=True)
print("✅ YOLOv5 loaded successfully!")
# Load object list from file
OBJECT_LIST_FILE = "objectlist.txt"
if os.path.exists(OBJECT_LIST_FILE):
with open(OBJECT_LIST_FILE, "r") as f:
OBJECT_LIST = set(line.strip().lower() for line in f if line.strip())
else:
OBJECT_LIST = set()
def get_stream_url():
"""Fetch fresh 720p YouTube stream URL using yt-dlp."""
ydl_opts = {'quiet': True, 'format': 'bestvideo[height=720]'}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(YOUTUBE_URL, download=False)
return info_dict.get("url", None)
def generate_frames():
"""Capture video frames, apply object detection, and stream as MJPEG."""
stream_url = get_stream_url()
if not stream_url:
print(" Failed to fetch stream URL!")
return
print("🎥 Starting FFmpeg stream...")
ffmpeg_process = subprocess.Popen([
"ffmpeg", "-re", "-i", stream_url,
"-f", "rawvideo", "-pix_fmt", "bgr24", "pipe:1"
], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8)
frame_width, frame_height = 1280, 720 # Set video frame size
while True:
raw_frame = ffmpeg_process.stdout.read(frame_width * frame_height * 3) # Read raw BGR frame
if not raw_frame:
print("No frame received!")
break
frame = np.frombuffer(raw_frame, np.uint8).reshape((frame_height, frame_width, 3)) # Convert to NumPy array
# Run YOLO object detection
results = model(frame)
detections = results.pandas().xyxy[0] # Convert detections to Pandas DataFrame
if detections.empty:
print("No objects detected in this frame.")
frame = frame.copy() # Make a writable copy
frame[:] = 0 # Make entire frame black
else:
print(f"Detected {len(detections)} objects!")
print(detections[["name", "confidence"]]) # Print detected object names and confidence
mask = np.zeros_like(frame) # Create black mask
for _, row in detections.iterrows():
if row["confidence"] > CONFIDENCE_THRESHOLD and row["name"].lower() in OBJECT_LIST:
x1, y1, x2, y2 = int(row["xmin"]), int(row["ymin"]), int(row["xmax"]), int(row["ymax"])
# Expand bounding box by 75px
x1, y1 = max(0, x1 - 75), max(0, y1 - 75)
x2, y2 = min(frame_width, x2 + 75), min(frame_height, y2 + 75)
# Copy detected object to mask
mask[y1:y2, x1:x2] = frame[y1:y2, x1:x2]
#dont draw bounding box
#cv2.rectangle(mask, (x1, y1), (x2, y2), (0, 255, 0), 2)
#label = f"{row['name']} ({row['confidence']:.2f})"
#cv2.putText(mask, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
frame = mask # Replace original frame with masked frame
# Encode and yield the frame as JPEG
_, buffer = cv2.imencode('.jpg', frame)
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
@app.route('/video')
def video_feed():
"""Stream processed video frames."""
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
print("Running at http://localhost:5000/video") #port
app.run(host='0.0.0.0', port=5000, debug=True, threaded=True)