128 lines
4.7 KiB
Python
Executable file
128 lines
4.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import cv2
|
|
import torch
|
|
import yt_dlp
|
|
import subprocess
|
|
import time
|
|
import os
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from flask import Flask, Response
|
|
import warnings
|
|
|
|
# Warning! saves objects in objectlist.txt to file /detections/$object
|
|
|
|
warnings.simplefilter("ignore", category=FutureWarning) #ignores torch warning
|
|
|
|
app = Flask(__name__)
|
|
|
|
YOUTUBE_URL = "https://www.youtube.com/watch?v=t45_gP7I82I" # Stream URL
|
|
CONFIDENCE_THRESHOLD = 0.4 # Confidence threshold
|
|
MODEL = "yolov5s" # YOLO model version
|
|
|
|
# Load YOLOv5 model everytime, this keeps it updated, make force_reload = False if not.
|
|
print("🔄 Loading YOLOv5 model...")
|
|
model = torch.hub.load("ultralytics/yolov5", "custom", path=MODEL, force_reload=True)
|
|
print("✅ YOLOv5 loaded successfully!")
|
|
|
|
# Load object list from file
|
|
OBJECT_LIST_FILE = "objectlist.txt"
|
|
if os.path.exists(OBJECT_LIST_FILE):
|
|
with open(OBJECT_LIST_FILE, "r") as f:
|
|
OBJECT_NAMES = set(line.strip().lower() for line in f if line.strip())
|
|
print(f"📜 Loaded {len(OBJECT_NAMES)} objects from {OBJECT_LIST_FILE}")
|
|
else:
|
|
print(f"⚠️ {OBJECT_LIST_FILE} not found! No filtering will be applied.")
|
|
OBJECT_NAMES = set() # Empty set (detects all objects)
|
|
|
|
# Ensure directories exist
|
|
os.makedirs("detections", exist_ok=True)
|
|
|
|
def get_stream_url():
|
|
"""Fetch 720p YouTube stream URL using yt-dlp."""
|
|
ydl_opts = {'quiet': True, 'format': 'bestvideo[height=720]'}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info_dict = ydl.extract_info(YOUTUBE_URL, download=False)
|
|
return info_dict.get("url", None)
|
|
|
|
def save_cropped_object(frame, bbox, label):
|
|
"""Save cropped object with 50px padding in its respective folder."""
|
|
x1, y1, x2, y2 = bbox
|
|
h, w, _ = frame.shape
|
|
|
|
# Add 50px padding while ensuring within image bounds
|
|
x1, y1 = max(0, x1 - 50), max(0, y1 - 50)
|
|
x2, y2 = min(w, x2 + 50), min(h, y2 + 50)
|
|
|
|
cropped = frame[y1:y2, x1:x2] # Crop the bounding box
|
|
|
|
# Create folder for the object if it doesn't exist
|
|
save_dir = os.path.join("detections", label)
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
|
|
# Save with timestamp
|
|
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg"
|
|
cv2.imwrite(os.path.join(save_dir, filename), cropped)
|
|
print(f"📸 Saved: {os.path.join(save_dir, filename)}")
|
|
|
|
def generate_frames():
|
|
"""Capture video frames, apply object detection, and stream as MJPEG."""
|
|
stream_url = get_stream_url()
|
|
if not stream_url:
|
|
print("❌ Failed to fetch stream URL!")
|
|
return
|
|
|
|
print("🎥 Starting FFmpeg stream...")
|
|
ffmpeg_process = subprocess.Popen([
|
|
"ffmpeg", "-re", "-i", stream_url,
|
|
"-f", "rawvideo", "-pix_fmt", "bgr24", "pipe:1"
|
|
], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8)
|
|
|
|
frame_width, frame_height = 1280, 720 # Expecting 720p stream
|
|
|
|
while True:
|
|
raw_frame = ffmpeg_process.stdout.read(frame_width * frame_height * 3) # Read raw BGR frame
|
|
if not raw_frame:
|
|
print("❌ No frame received!")
|
|
break
|
|
|
|
frame = np.frombuffer(raw_frame, np.uint8).reshape((frame_height, frame_width, 3)) # Convert to NumPy array
|
|
|
|
# Run YOLO object detection
|
|
results = model(frame)
|
|
detections = results.pandas().xyxy[0] # Convert detections to Pandas DataFrame
|
|
|
|
if detections.empty:
|
|
print("No objects detected.")
|
|
else:
|
|
print(f"✅ Detected {len(detections)} objects!")
|
|
|
|
for _, row in detections.iterrows():
|
|
label = row["name"].lower()
|
|
confidence = row["confidence"]
|
|
|
|
if confidence > CONFIDENCE_THRESHOLD and (not OBJECT_NAMES or label in OBJECT_NAMES):
|
|
x1, y1, x2, y2 = int(row["xmin"]), int(row["ymin"]), int(row["xmax"]), int(row["ymax"])
|
|
save_cropped_object(frame, (x1, y1, x2, y2), label)
|
|
|
|
# Draw bounding boxes
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Green box
|
|
cv2.putText(frame, f"{label} ({confidence:.2f})", (x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
# Encode and present the frame as JPEG
|
|
_, buffer = cv2.imencode('.jpg', frame)
|
|
yield (b'--frame\r\n'
|
|
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
|
|
|
|
|
|
@app.route('/video')
|
|
def video_feed():
|
|
"""Stream processed video frames."""
|
|
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
|
if __name__ == '__main__':
|
|
print("Running at http://localhost:5000/video")
|
|
app.run(host='0.0.0.0', port=5000, debug=True, threaded=True)
|
|
|