import cv2 import torch import numpy as np import onnxruntime as ort from PIL import Image from typing import List, Tuple, Dict from .config import Config class DroneVision: def __init__(self): # 1. Face Detection: YuNet (ONNX) + Haar Cascade Fallback self.face_detector = None try: self.face_detector = cv2.FaceDetectorYN.create( "models/face_detection_yunet.onnx", "", (320, 320), 0.4, 0.3, 5000 ) print("[AI] YuNet Face Detector loaded.") except Exception as e: print(f"[AI Warning] YuNet failed to load: {e}") self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') self.profile_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml') # 2. ONNX Runtime Sessions try: providers = ['CPUExecutionProvider'] if 'CUDAExecutionProvider' in ort.get_available_providers(): providers.insert(0, 'CUDAExecutionProvider') self.depth_session = ort.InferenceSession("models/midas_small.onnx", providers=providers) self.reid_session = ort.InferenceSession("models/reid_mobilenet.onnx", providers=providers) print(f"[AI] Depth & ReID (ONNX) initialized.") except Exception as e: print(f"[AI Error] ONNX initialization failed: {e}") self.depth_session = None self.reid_session = None # 3. Person Detection (SSD Lite Torch) try: self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") from torchvision.models.detection import ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights weights = SSDLite320_MobileNet_V3_Large_Weights.DEFAULT self.person_model = ssdlite320_mobilenet_v3_large(weights=weights, box_score_thresh=Config.PERSON_CONF_THRESHOLD).to(self.device) self.person_model.eval() self.person_preprocess = weights.transforms() except Exception as e: print(f"[AI Error] Person detector failed: {e}") self.person_model = None self.depth_map_vis = None self.zones = {"LEFT": False, "CENTER": False, "RIGHT": False} self.zone_scores = {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0} def _detect_haar(self, img_gray: np.ndarray) -> List[Tuple]: faces = list(self.face_cascade.detectMultiScale(img_gray, 1.1, 7, minSize=(30, 30))) profiles = self.profile_cascade.detectMultiScale(img_gray, 1.1, 8, minSize=(35, 35)) for p in profiles: is_new = True px, py, pw, ph = p for (fx, fy, fw, fh) in faces: if abs(px - fx) < fw/2 and abs(py - fy) < fh/2: is_new = False; break if is_new: faces.append(tuple(p)) return faces def detect_faces(self, frame: np.ndarray) -> List[Tuple]: h, w = frame.shape[:2] faces = [] # YuNet Detection if self.face_detector is not None: try: self.face_detector.setInputSize((w, h)) _, detections = self.face_detector.detect(frame) if detections is not None: for det in detections: faces.append(tuple(det[0:4].astype(int))) except: pass # Haar Fallback gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (3, 3), 0) haar_faces = self._detect_haar(gray) for hf in haar_faces: is_new = True hx, hy, hw, hh = hf for (fx, fy, fw, fh) in faces: if abs(hx - fx) < fw/2 and abs(hy - fy) < fh/2: is_new = False; break if is_new: faces.append(hf) # Center Zoom for extreme distance zh, zw = int(h * 0.4), int(w * 0.4) cy, cx = h // 2, w // 2 y1, y2 = cy - zh // 2, cy + zh // 2 x1, x2 = cx - zw // 2, cx + zw // 2 center_crop = frame[y1:y2, x1:x2] center_upscaled = cv2.resize(center_crop, (zw * 2, zh * 2), interpolation=cv2.INTER_LANCZOS4) # In Zoom mode, use YuNet if available, otherwise Haar if self.face_detector is not None: try: uh, uw = center_upscaled.shape[:2] self.face_detector.setInputSize((uw, uh)) _, zoom_detections = self.face_detector.detect(center_upscaled) if zoom_detections is not None: for det in zoom_detections: zx, zy, zw_f, zh_f = det[0:4] rx, ry = int(zx / 2) + x1, int(zy / 2) + y1 rw, rh = int(zw_f / 2), int(zh_f / 2) is_new = True for (fx, fy, fw, fh) in faces: if abs(rx - fx) < fw/2 and abs(ry - fy) < fh/2: is_new = False; break if is_new: faces.append((rx, ry, rw, rh)) except: pass # Always run Haar on zoom for robustness zoom_gray = cv2.cvtColor(center_upscaled, cv2.COLOR_BGR2GRAY) zoom_haar = self._detect_haar(zoom_gray) for (zx, zy, zw_f, zh_f) in zoom_haar: rx, ry = int(zx / 2) + x1, int(zy / 2) + y1 rw, rh = int(zw_f / 2), int(zh_f / 2) is_new = True for (fx, fy, fw, fh) in faces: if abs(rx - fx) < fw/2 and abs(ry - fy) < fh/2: is_new = False; break if is_new: faces.append((rx, ry, rw, rh)) return faces def detect_persons(self, frame: np.ndarray) -> List[Tuple]: if self.person_model is None: return [] img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(img_rgb) input_tensor = self.person_preprocess(pil_img).to(self.device).unsqueeze(0) with torch.no_grad(): output = self.person_model(input_tensor)[0] persons = [] for i in range(len(output['labels'])): if output['labels'][i] == 1 and output['scores'][i] > Config.PERSON_CONF_THRESHOLD: box = output['boxes'][i].cpu().numpy().astype(int) persons.append((box[0], box[1], box[2]-box[0], box[3]-box[1])) return persons def extract_person_features(self, frame: np.ndarray, box: Tuple) -> np.ndarray: if self.reid_session is None: return None try: x, y, w, h = box x1, y1 = max(0, x), max(0, y) x2, y2 = min(frame.shape[1], x+w), min(frame.shape[0], y+h) if x2 <= x1 or y2 <= y1: return None crop = frame[y1:y2, x1:x2] img_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) img_resized = cv2.resize(img_rgb, (224, 224)) img_float = img_resized.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) img_norm = (img_float - mean) / std img_input = np.transpose(img_norm, (2, 0, 1)).astype(np.float32) img_input = np.expand_dims(img_input, axis=0) ort_inputs = {self.reid_session.get_inputs()[0].name: img_input} features = self.reid_session.run(None, ort_inputs)[0] return features.flatten() except: return None def compare_features(self, feat1: np.ndarray, feat2: np.ndarray) -> float: if feat1 is None or feat2 is None: return 0.0 try: norm1 = np.linalg.norm(feat1) norm2 = np.linalg.norm(feat2) if norm1 == 0 or norm2 == 0: return 0.0 return np.dot(feat1, feat2) / (norm1 * norm2) except: return 0.0 def estimate_depth_and_radar(self, frame: np.ndarray): if self.depth_session is None: return try: h, w = frame.shape[:2] slice_y1, slice_y2 = int(h*0.25), int(h*0.75) analysis_area = frame[slice_y1:slice_y2, :] img_rgb = cv2.cvtColor(analysis_area, cv2.COLOR_BGR2RGB) img_resized = cv2.resize(img_rgb, (256, 256)) img_float = img_resized.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) img_norm = (img_float - mean) / std img_input = np.transpose(img_norm, (2, 0, 1)).astype(np.float32) img_input = np.expand_dims(img_input, axis=0) ort_inputs = {self.depth_session.get_inputs()[0].name: img_input} pred = self.depth_session.run(None, ort_inputs)[0][0] pred_upscaled = cv2.resize(pred, (analysis_area.shape[1], analysis_area.shape[0]), interpolation=cv2.INTER_CUBIC) out_norm = cv2.normalize(pred_upscaled, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U) self.depth_map_vis = cv2.applyColorMap(out_norm, cv2.COLORMAP_MAGMA) depth_map = pred_upscaled / (np.max(pred_upscaled) + 1e-5) dh, dw = depth_map.shape zone_w = dw // 3 for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]): zx1, zx2 = i * zone_w, (i + 1) * zone_w score = np.mean(depth_map[dh//4:3*dh//4, zx1:zx2]) self.zone_scores[name] = score self.zones[name] = score > Config.DEPTH_THRESHOLD except: pass