import cv2 import numpy as np import threading import time import traceback from tello_sim_client import TelloSimClient from .config import Config, Colors from .vision import DroneVision from .flight import FlightController from .ui import HUD class FaceTrackingApp: def __init__(self, use_real_tello: bool = False): print(f"\n[System] Initializing Tello AI Pilot (Real Tello: {use_real_tello})") self.tello = TelloSimClient(use_real_tello=use_real_tello) self.tello.connect() self.tello.streamon() self.vision = DroneVision() self.flight_controller = FlightController() # State Management self.is_running = True self.is_manual = True self.is_sport = False # NEW: Sport Mode flag self.is_locked = False self.is_taking_off = False self.is_flying = False self.takeoff_error = False self.locked_person = None self.locked_person_features = None self.lock_trigger = False self.emergency_stop = False # Manual Override States self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0 self.is_rotating = False self.current_height = 0.0 self.current_yaw = 0.0 self.target_altitude = Config.TARGET_ALTITUDE self.last_rc_time = 0.0 self._last_heartbeat = 0.0 self._prev_rc = [0, 0, 0, 0] # AI Threading self.ai_lock = threading.Lock() self.latest_frame = None self.ai_results = { "faces": [], "persons": [], "depth_map_vis": None, "zones": {"LEFT": False, "CENTER": False, "RIGHT": False}, "zone_scores": {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0}, "reid_target_box": None } cv2.namedWindow(Config.WIN_NAME, cv2.WINDOW_NORMAL) cv2.resizeWindow(Config.WIN_NAME, Config.WIDTH, Config.HEIGHT) cv2.setMouseCallback(Config.WIN_NAME, self._on_mouse) def _on_mouse(self, event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: with self.ai_lock: faces = self.ai_results["faces"] persons = self.ai_results["persons"] frame = self.latest_frame.copy() if self.latest_frame is not None else None clicked_target = None for pbox in persons: px, py, pw, ph = pbox if px < x < px+pw and py < y < py+ph: clicked_target = pbox; break if not clicked_target: for fbox in faces: fx, fy, fw, fh = fbox if fx < x < fx+fw and fy < y < fy+fh: clicked_target = fbox; break if clicked_target and frame is not None: feat = self.vision.extract_person_features(frame, clicked_target) if feat is not None: print(f"[System] Manual Lock-ON at ({x}, {y})") self.locked_person = clicked_target self.locked_person_features = feat self.is_locked = True self.lock_trigger = False self.is_manual = False else: print("[System] Manual Unlock") self.is_locked = False self.locked_person = None self.locked_person_features = None def update_telemetry(self): try: self.current_yaw = float(self.tello.get_yaw()) raw_h = float(self.tello.get_height()) self.current_height = raw_h if abs(raw_h) < 10 else raw_h / 100.0 except: pass def ai_worker(self): while self.is_running: try: frame_to_process = None with self.ai_lock: if self.latest_frame is not None: frame_to_process = self.latest_frame.copy() if frame_to_process is not None: faces = self.vision.detect_faces(frame_to_process) persons = self.vision.detect_persons(frame_to_process) self.vision.estimate_depth_and_radar(frame_to_process) reid_match = None if self.is_locked and self.locked_person_features is not None: max_sim = 0 for pbox in persons: feat = self.vision.extract_person_features(frame_to_process, pbox) sim = self.vision.compare_features(self.locked_person_features, feat) if sim > max_sim: max_sim = sim reid_match = pbox if max_sim > 0.75: new_feat = self.vision.extract_person_features(frame_to_process, reid_match) if new_feat is not None: self.locked_person_features = self.locked_person_features * 0.95 + new_feat * 0.05 else: reid_match = None with self.ai_lock: self.ai_results["faces"] = faces self.ai_results["persons"] = persons self.ai_results["depth_map_vis"] = self.vision.depth_map_vis self.ai_results["zones"] = self.vision.zones.copy() self.ai_results["zone_scores"] = self.vision.zone_scores.copy() self.ai_results["reid_target_box"] = reid_match time.sleep(0.01) except Exception: time.sleep(1) def run(self): ai_thread = threading.Thread(target=self.ai_worker, daemon=True) ai_thread.start() try: while self.is_running: fr = self.tello.get_frame_read() if fr is None or fr.frame is None: time.sleep(0.01); continue frame = cv2.resize(fr.frame.copy(), (Config.WIDTH, Config.HEIGHT)) with self.ai_lock: self.latest_frame = frame.copy() self.update_telemetry() with self.ai_lock: faces = self.ai_results["faces"] persons = self.ai_results["persons"] depth_map_vis = self.ai_results["depth_map_vis"] zones = self.ai_results["zones"] zone_scores = self.ai_results["zone_scores"] reid_box = self.ai_results["reid_target_box"] if self.is_locked and reid_box is not None: self.locked_person = reid_box if self.lock_trigger and len(faces) > 0 and len(persons) > 0: (fx, fy, fw, fh) = max(faces, key=lambda f: f[2]*f[3]) fcx, fcy = fx + fw//2, fy + fh//2 for pbox in persons: px, py, pw, ph = pbox if px < fcx < px+pw and py < fcy < py+ph: feat = self.vision.extract_person_features(frame, pbox) if feat is not None: self.locked_person = pbox self.locked_person_features = feat self.is_locked = True self.lock_trigger = False self.is_manual = False break # UI Graphics if self.is_locked and self.locked_person: (x,y,w,h) = self.locked_person cv2.rectangle(frame, (x,y), (x+w,y+h), Colors.BLUE, 3) cv2.putText(frame, "TARGET LOCKED", (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, Colors.BLUE, 2) elif self.flight_controller.smooth_face is not None: (x,y,w,h) = self.flight_controller.smooth_face cv2.rectangle(frame, (x,y), (x+w,y+h), Colors.GREEN, 2) cv2.putText(frame, "FACE FOUND", (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, Colors.GREEN, 2) key = cv2.waitKey(1) & 0xFF self._handle_input(key) if self.is_flying and not self.is_taking_off and not self.emergency_stop: try: tof = int(self.tello.get_distance_tof()) except: tof = 1000 active_manual_rc = (self.m_lr, self.m_fb, self.m_ud, 60 if self.is_rotating else self.m_yv) rc, status = self.flight_controller.calculate( faces=faces, is_manual=self.is_manual, is_sport=self.is_sport, # NEW emergency_stop=self.emergency_stop, is_locked=self.is_locked, locked_person=self.locked_person, current_height=self.current_height, target_altitude=0.0, tof=tof, zones=zones, zone_scores=zone_scores, manual_rc=active_manual_rc ) if self.is_sport: status = "SPORT MODE: ACTIVE" now = time.time() if now - self.last_rc_time >= 0.1: changed = any(abs(rc[i] - self._prev_rc[i]) > 1 for i in range(4)) heartbeat = now - self._last_heartbeat >= 0.5 if changed or heartbeat: self.tello.send_rc_control(*rc) self._prev_rc = list(rc) self._last_heartbeat = now self.last_rc_time = now else: if self.is_taking_off: status = "TAKING OFF..." elif self.takeoff_error: status = "TAKEOFF ERROR (LOW BAT?)" elif self.emergency_stop: status = "STOPPED" else: status = "READY / ON GROUND" try: bat = int(self.tello.get_battery()) except: bat = 0 HUD.draw(frame, status, self.emergency_stop, self.is_locked, self.lock_trigger, self.current_height, 0.0, self.current_yaw, bat, depth_map_vis, zones) cv2.imshow(Config.WIN_NAME, frame) except Exception: traceback.print_exc() finally: self.is_running = False try: self.tello.land() except: pass self.tello.end() cv2.destroyAllWindows() def _handle_takeoff(self): if self.is_taking_off or self.is_flying: return self.is_taking_off = True self.takeoff_error = False def _task(): try: print("[Flight] Takeoff command sent to SDK...") self.tello.takeoff() time.sleep(3) self.is_flying = True print("[Flight] In Air. Logic active.") except Exception as e: print(f"[Flight Error] Takeoff failed: {e}") self.takeoff_error = True finally: self.is_taking_off = False threading.Thread(target=_task, daemon=True).start() def _handle_input(self, key: int): if key == 13: self.is_running = False elif key == 32: self.emergency_stop = not self.emergency_stop elif key == ord('m'): self.is_manual = not self.is_manual elif key == ord('2'): self.is_sport = not self.is_sport # Toggle Sport Mode elif key == ord('k'): self.lock_trigger = not self.lock_trigger self.is_locked = False self.locked_person_features = None elif key == ord('t'): if not self.is_flying and not self.is_taking_off: self.emergency_stop = False self._handle_takeoff() elif key == ord('l'): self.is_flying = False self.is_taking_off = False try: self.tello.land() except: pass elif key == ord('1'): self.is_rotating = not self.is_rotating self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0 if self.is_manual and not self.emergency_stop: s = 100 if key == ord('w'): self.m_fb = s elif key == ord('s'): self.m_fb = -s elif key == ord('a'): self.m_lr = -s elif key == ord('d'): self.m_lr = s elif key == ord('r'): self.m_ud = s elif key == ord('f'): self.m_ud = -s elif key == ord('e'): self.m_yv = s elif key == ord('z'): self.m_yv = -s