import cv2 import numpy as np from tello_sim_client import TelloSimClient import time import torch # Deep Learning import torchvision.transforms as T # --- Configuration --- WINDOW_WIDTH = 960 WINDOW_HEIGHT = 720 TARGET_FACE_SIZE = 120 FACE_CENTER_X = WINDOW_WIDTH // 2 FACE_CENTER_Y = WINDOW_HEIGHT // 2 TARGET_ALTITUDE = 1.5 # meters KP_ALTITUDE = 60 # Multi-Zone Radar & DL Config OBSTACLE_THRESHOLD_TOF = 75 DEPTH_THRESHOLD = 0.75 # Höherer Schwellenwert (weniger empfindlich) # --- Colors (BGR) --- COLOR_GREEN = (0, 255, 0) COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) COLOR_WHITE = (255, 255, 255) class FaceTrackingApp: def __init__(self): print("[App] Initializing TelloSimClient...") self.tello = TelloSimClient() self.tello.connect() self.tello.streamon() # Load Face Detector cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' self.face_cascade = cv2.CascadeClassifier(cascade_path) # Load MiDaS Model print("[AI] Loading MiDaS DL Model...") try: self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") self.model = torch.hub.load("intel-isl/MiDaS", "MiDaS_small").to(self.device) self.model.eval() self.transform = torch.hub.load("intel-isl/MiDaS", "transforms").small_transform except Exception as e: print(f"[AI] Error: {e}") self.model = None self.is_running = True self.is_manual = False self.is_orbiting = False self.zones = {"LEFT": False, "CENTER": False, "RIGHT": False} self.zone_scores = {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0} self.status = "INIT" self.depth_map_vis = None # Position Tracking self.pos_x, self.pos_z = 0.0, 0.0 self.path_history = [(0.0, 0.0)] self.last_update_time = time.time() self.current_yaw = 0.0 self.current_height = 0.0 # Velocity Tracking self.last_sent_rc = [0, 0, 0, 0] # lr, fb, ud, yv self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0 self.avoidance_active = False self.avoidance_dir = 0 # -1 for right, 1 for left self.center_blocked = False self.search_start = time.time() cv2.namedWindow("Tello AI Pilot (Stable Mode)", cv2.WINDOW_NORMAL) cv2.resizeWindow("Tello AI Pilot (Stable Mode)", WINDOW_WIDTH, WINDOW_HEIGHT) def estimate_depth(self, frame): if self.model is None: return None # Wir analysieren nur den mittleren Streifen (kein Boden, kein Himmel) analysis_area = frame[200:520, :] # Fokus auf Augenhöhe img = cv2.cvtColor(analysis_area, cv2.COLOR_BGR2RGB) input_batch = self.transform(img).to(self.device) with torch.no_grad(): pred = self.model(input_batch) pred = torch.nn.functional.interpolate(pred.unsqueeze(1), size=analysis_area.shape[:2], mode="bicubic", align_corners=False).squeeze() out = pred.cpu().numpy() out_norm = cv2.normalize(out, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U) self.depth_map_vis = cv2.applyColorMap(out_norm, cv2.COLORMAP_MAGMA) return out / (np.max(out) + 1e-5) def run_radar_scan(self, depth_map): if depth_map is None: return h, w = depth_map.shape zone_w = w // 3 for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]): zx_start, zx_end = i * zone_w, (i + 1) * zone_w # Durchschnittliche Tiefe im Zentrum der Zone score = np.mean(depth_map[h//4 : 3*h//4, zx_start:zx_end]) self.zone_scores[name] = score self.zones[name] = score > DEPTH_THRESHOLD def update_telemetry(self, frame): try: self.current_yaw = float(self.tello.get_yaw()) raw_height = float(self.tello.get_height()) # Auto-Scale detection (Simulator = m, Real = cm) self.current_height = raw_height if abs(raw_height) < 10 else raw_height / 100.0 dm = self.estimate_depth(frame) if dm is not None: self.run_radar_scan(dm) # Dead Reckoning now = time.time() dt = now - self.last_update_time self.last_update_time = now # Simulator Scale (0.6 ist ein guter Annäherungswert für cm/s) sf = 0.6 rad = np.radians(self.current_yaw) lr, fb = self.last_sent_rc[0], self.last_sent_rc[1] vx = fb * np.sin(rad) + lr * np.cos(rad) vz = fb * np.cos(rad) - lr * np.sin(rad) self.pos_x += vx * sf * dt self.pos_z += vz * sf * dt if np.linalg.norm(np.array([self.pos_x, self.pos_z]) - np.array(self.path_history[-1])) > 0.5: self.path_history.append((self.pos_x, self.pos_z)) except: pass def process_flight(self, faces): lr, fb, ud, yv = 0, 0, 0, 0 # Terrain Following (Altitude in m) if not self.is_manual: ud = int(np.clip((TARGET_ALTITUDE - self.current_height) * KP_ALTITUDE, -40, 40)) # --- AI PILOT --- # TOF Sensor Check try: tof = int(self.tello.get_distance_tof()) except: tof = 1000 self.center_blocked = self.zones["CENTER"] or tof < OBSTACLE_THRESHOLD_TOF if self.center_blocked: self.status = "!!! OBSTACLE !!!" fb = 0 # Stick to a direction if already avoiding if not self.avoidance_active: self.avoidance_dir = 1 if self.zone_scores["LEFT"] < self.zone_scores["RIGHT"] else -1 self.avoidance_active = True yv = 45 * self.avoidance_dir elif self.avoidance_active: self.status = "RECOVERING" fb = 15; yv = 20 * self.avoidance_dir if not self.zones["CENTER"]: self.avoidance_active = False self.avoidance_dir = 0 elif self.is_manual: self.status = "MANUAL" lr, fb, ud, yv = self.m_lr, self.m_fb, self.m_ud, self.m_yv elif len(faces) > 0: self.search_start = time.time() (x, y, w, h) = max(faces, key=lambda f: f[2] * f[3]) yv_err = (x + w // 2) - FACE_CENTER_X yv = int(np.clip(0.18 * yv_err, -60, 60)) fb_raw = int(np.clip(0.5 * (TARGET_FACE_SIZE - w), -50, 50)) # Sanftere Reduzierung bei Drehung fb = int(fb_raw * 0.5) if abs(yv_err) > 60 else fb_raw if self.is_orbiting: self.status = "AI ORBIT"; lr = 35 else: self.status = "AI TRACKING" else: # SUCHE: "Walk and Look" Muster elapsed = (time.time() - self.search_start) % 4.0 if elapsed < 2.8: self.status = "SEARCH: FORWARD" fb = 35; yv = 8 else: self.status = "SEARCH: TURNING" fb = 5; yv = 35 self.last_sent_rc = [lr, fb, ud, yv] self.tello.send_rc_control(lr, fb, ud, yv) def draw_hud(self, frame, b): try: if hasattr(self.tello, 'is_flying'): state = "FLYING" if self.tello.is_flying else "LANDED" else: state = self.tello.get_current_state() except: state = "ERROR" if self.depth_map_vis is not None: mini = cv2.resize(self.depth_map_vis, (160, 120)) frame[60:180, 20:180] = mini cv2.rectangle(frame, (20, 60), (180, 180), COLOR_WHITE, 1) cv2.rectangle(frame, (0, 0), (WINDOW_WIDTH, 60), (0,0,0), -1) cv2.putText(frame, f"{self.status} | {state.upper()} | ALT: {self.current_height:.1f}m | BAT: {b}%", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLOR_GREEN if not self.center_blocked else COLOR_RED, 2) cv2.putText(frame, f"RC: {self.last_sent_rc}", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_WHITE, 1) # Radar Score Debugging for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]): col = COLOR_RED if self.zones[name] else COLOR_GREEN val = self.zone_scores[name] cv2.putText(frame, f"{name[:1]}:{val:.2f}", (WINDOW_WIDTH - 250 + i*80, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, col, 1) def run(self): try: self.last_update_time = time.time() while self.is_running: fr = self.tello.get_frame_read() if fr.frame is None: continue frame = cv2.resize(fr.frame.copy(), (WINDOW_WIDTH, WINDOW_HEIGHT)) self.update_telemetry(frame) # Face Detection (auf Originalbild) faces = self.face_cascade.detectMultiScale(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), 1.2, 8) key = cv2.waitKey(1) & 0xFF if key == ord('q'): self.is_running = False elif key == ord('m'): self.is_manual = not self.is_manual elif key == ord('o'): self.is_orbiting = not self.is_orbiting elif key == ord('t'): print("[App] Manual Takeoff Triggered") self.tello.takeoff() elif key == ord('l'): self.tello.land() if self.is_manual: s = 60 self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0 if key == ord('w'): self.m_fb = s elif key == ord('s'): self.m_fb = -s elif key == ord('a'): self.m_lr = -s elif key == ord('d'): self.m_lr = s elif key == ord('r'): self.m_ud = s # R = UP elif key == ord('f'): self.m_ud = -s # F = DOWN elif key == ord('e'): self.m_yv = s # E = YAW RIGHT elif key == ord('z'): self.m_yv = -s # Z = YAW LEFT self.process_flight(faces) try: bat = int(self.tello.get_battery()) except: bat = 0 self.draw_hud(frame, bat) cv2.imshow("Tello AI Pilot (Stable Mode)", frame) except Exception as e: print(f"Error: {e}") finally: self.tello.land(); self.tello.end(); cv2.destroyAllWindows() if __name__ == "__main__": app = FaceTrackingApp() app.run()