266 lines
11 KiB
Python
266 lines
11 KiB
Python
import cv2
|
|
import numpy as np
|
|
from tello_sim_client import TelloSimClient
|
|
import time
|
|
import torch # Deep Learning
|
|
import torchvision.transforms as T
|
|
|
|
# --- Configuration ---
|
|
WINDOW_WIDTH = 960
|
|
WINDOW_HEIGHT = 720
|
|
TARGET_FACE_SIZE = 120
|
|
FACE_CENTER_X = WINDOW_WIDTH // 2
|
|
FACE_CENTER_Y = WINDOW_HEIGHT // 2
|
|
|
|
TARGET_ALTITUDE = 1.5 # meters
|
|
KP_ALTITUDE = 60
|
|
|
|
# Multi-Zone Radar & DL Config
|
|
OBSTACLE_THRESHOLD_TOF = 75
|
|
DEPTH_THRESHOLD = 0.75 # Höherer Schwellenwert (weniger empfindlich)
|
|
|
|
# --- Colors (BGR) ---
|
|
COLOR_GREEN = (0, 255, 0)
|
|
COLOR_RED = (0, 0, 255)
|
|
COLOR_BLUE = (255, 0, 0)
|
|
COLOR_WHITE = (255, 255, 255)
|
|
|
|
class FaceTrackingApp:
|
|
def __init__(self):
|
|
print("[App] Initializing TelloSimClient...")
|
|
self.tello = TelloSimClient()
|
|
self.tello.connect()
|
|
self.tello.streamon()
|
|
|
|
# Load Face Detector
|
|
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
|
|
self.face_cascade = cv2.CascadeClassifier(cascade_path)
|
|
|
|
# Load MiDaS Model
|
|
print("[AI] Loading MiDaS DL Model...")
|
|
try:
|
|
self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
|
|
self.model = torch.hub.load("intel-isl/MiDaS", "MiDaS_small").to(self.device)
|
|
self.model.eval()
|
|
self.transform = torch.hub.load("intel-isl/MiDaS", "transforms").small_transform
|
|
except Exception as e:
|
|
print(f"[AI] Error: {e}")
|
|
self.model = None
|
|
|
|
self.is_running = True
|
|
self.is_manual = False
|
|
self.is_orbiting = False
|
|
|
|
self.zones = {"LEFT": False, "CENTER": False, "RIGHT": False}
|
|
self.zone_scores = {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0}
|
|
self.status = "INIT"
|
|
self.depth_map_vis = None
|
|
|
|
# Position Tracking
|
|
self.pos_x, self.pos_z = 0.0, 0.0
|
|
self.path_history = [(0.0, 0.0)]
|
|
self.last_update_time = time.time()
|
|
self.current_yaw = 0.0
|
|
self.current_height = 0.0
|
|
|
|
# Velocity Tracking
|
|
self.last_sent_rc = [0, 0, 0, 0] # lr, fb, ud, yv
|
|
self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0
|
|
|
|
self.avoidance_active = False
|
|
self.avoidance_dir = 0 # -1 for right, 1 for left
|
|
self.center_blocked = False
|
|
self.search_start = time.time()
|
|
|
|
cv2.namedWindow("Tello AI Pilot (Stable Mode)", cv2.WINDOW_NORMAL)
|
|
cv2.resizeWindow("Tello AI Pilot (Stable Mode)", WINDOW_WIDTH, WINDOW_HEIGHT)
|
|
|
|
def estimate_depth(self, frame):
|
|
if self.model is None: return None
|
|
# Wir analysieren nur den mittleren Streifen (kein Boden, kein Himmel)
|
|
analysis_area = frame[200:520, :] # Fokus auf Augenhöhe
|
|
img = cv2.cvtColor(analysis_area, cv2.COLOR_BGR2RGB)
|
|
input_batch = self.transform(img).to(self.device)
|
|
with torch.no_grad():
|
|
pred = self.model(input_batch)
|
|
pred = torch.nn.functional.interpolate(pred.unsqueeze(1), size=analysis_area.shape[:2], mode="bicubic", align_corners=False).squeeze()
|
|
out = pred.cpu().numpy()
|
|
out_norm = cv2.normalize(out, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
|
|
self.depth_map_vis = cv2.applyColorMap(out_norm, cv2.COLORMAP_MAGMA)
|
|
return out / (np.max(out) + 1e-5)
|
|
|
|
def run_radar_scan(self, depth_map):
|
|
if depth_map is None: return
|
|
h, w = depth_map.shape
|
|
zone_w = w // 3
|
|
for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]):
|
|
zx_start, zx_end = i * zone_w, (i + 1) * zone_w
|
|
# Durchschnittliche Tiefe im Zentrum der Zone
|
|
score = np.mean(depth_map[h//4 : 3*h//4, zx_start:zx_end])
|
|
self.zone_scores[name] = score
|
|
self.zones[name] = score > DEPTH_THRESHOLD
|
|
|
|
def update_telemetry(self, frame):
|
|
try:
|
|
self.current_yaw = float(self.tello.get_yaw())
|
|
raw_height = float(self.tello.get_height())
|
|
# Auto-Scale detection (Simulator = m, Real = cm)
|
|
self.current_height = raw_height if abs(raw_height) < 10 else raw_height / 100.0
|
|
|
|
dm = self.estimate_depth(frame)
|
|
if dm is not None: self.run_radar_scan(dm)
|
|
|
|
# Dead Reckoning
|
|
now = time.time()
|
|
dt = now - self.last_update_time
|
|
self.last_update_time = now
|
|
|
|
# Simulator Scale (0.6 ist ein guter Annäherungswert für cm/s)
|
|
sf = 0.6
|
|
rad = np.radians(self.current_yaw)
|
|
lr, fb = self.last_sent_rc[0], self.last_sent_rc[1]
|
|
vx = fb * np.sin(rad) + lr * np.cos(rad)
|
|
vz = fb * np.cos(rad) - lr * np.sin(rad)
|
|
self.pos_x += vx * sf * dt
|
|
self.pos_z += vz * sf * dt
|
|
|
|
if np.linalg.norm(np.array([self.pos_x, self.pos_z]) - np.array(self.path_history[-1])) > 0.5:
|
|
self.path_history.append((self.pos_x, self.pos_z))
|
|
except: pass
|
|
|
|
def process_flight(self, faces):
|
|
lr, fb, ud, yv = 0, 0, 0, 0
|
|
|
|
# Terrain Following (Altitude in m)
|
|
if not self.is_manual:
|
|
ud = int(np.clip((TARGET_ALTITUDE - self.current_height) * KP_ALTITUDE, -40, 40))
|
|
|
|
# --- AI PILOT ---
|
|
# TOF Sensor Check
|
|
try: tof = int(self.tello.get_distance_tof())
|
|
except: tof = 1000
|
|
|
|
self.center_blocked = self.zones["CENTER"] or tof < OBSTACLE_THRESHOLD_TOF
|
|
|
|
if self.center_blocked:
|
|
self.status = "!!! OBSTACLE !!!"
|
|
fb = 0
|
|
|
|
# Stick to a direction if already avoiding
|
|
if not self.avoidance_active:
|
|
self.avoidance_dir = 1 if self.zone_scores["LEFT"] < self.zone_scores["RIGHT"] else -1
|
|
self.avoidance_active = True
|
|
|
|
yv = 45 * self.avoidance_dir
|
|
elif self.avoidance_active:
|
|
self.status = "RECOVERING"
|
|
fb = 15; yv = 20 * self.avoidance_dir
|
|
if not self.zones["CENTER"]:
|
|
self.avoidance_active = False
|
|
self.avoidance_dir = 0
|
|
|
|
elif self.is_manual:
|
|
self.status = "MANUAL"
|
|
lr, fb, ud, yv = self.m_lr, self.m_fb, self.m_ud, self.m_yv
|
|
|
|
elif len(faces) > 0:
|
|
self.search_start = time.time()
|
|
(x, y, w, h) = max(faces, key=lambda f: f[2] * f[3])
|
|
yv_err = (x + w // 2) - FACE_CENTER_X
|
|
yv = int(np.clip(0.18 * yv_err, -60, 60))
|
|
fb_raw = int(np.clip(0.5 * (TARGET_FACE_SIZE - w), -50, 50))
|
|
|
|
# Sanftere Reduzierung bei Drehung
|
|
fb = int(fb_raw * 0.5) if abs(yv_err) > 60 else fb_raw
|
|
|
|
if self.is_orbiting:
|
|
self.status = "AI ORBIT"; lr = 35
|
|
else:
|
|
self.status = "AI TRACKING"
|
|
else:
|
|
# SUCHE: "Walk and Look" Muster
|
|
elapsed = (time.time() - self.search_start) % 4.0
|
|
if elapsed < 2.8:
|
|
self.status = "SEARCH: FORWARD"
|
|
fb = 35; yv = 8
|
|
else:
|
|
self.status = "SEARCH: TURNING"
|
|
fb = 5; yv = 35
|
|
|
|
self.last_sent_rc = [lr, fb, ud, yv]
|
|
self.tello.send_rc_control(lr, fb, ud, yv)
|
|
|
|
def draw_hud(self, frame, b):
|
|
try:
|
|
if hasattr(self.tello, 'is_flying'):
|
|
state = "FLYING" if self.tello.is_flying else "LANDED"
|
|
else:
|
|
state = self.tello.get_current_state()
|
|
except: state = "ERROR"
|
|
|
|
if self.depth_map_vis is not None:
|
|
mini = cv2.resize(self.depth_map_vis, (160, 120))
|
|
frame[60:180, 20:180] = mini
|
|
cv2.rectangle(frame, (20, 60), (180, 180), COLOR_WHITE, 1)
|
|
|
|
cv2.rectangle(frame, (0, 0), (WINDOW_WIDTH, 60), (0,0,0), -1)
|
|
cv2.putText(frame, f"{self.status} | {state.upper()} | ALT: {self.current_height:.1f}m | BAT: {b}%", (20, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLOR_GREEN if not self.center_blocked else COLOR_RED, 2)
|
|
|
|
cv2.putText(frame, f"RC: {self.last_sent_rc}", (20, 70),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_WHITE, 1)
|
|
|
|
# Radar Score Debugging
|
|
for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]):
|
|
col = COLOR_RED if self.zones[name] else COLOR_GREEN
|
|
val = self.zone_scores[name]
|
|
cv2.putText(frame, f"{name[:1]}:{val:.2f}", (WINDOW_WIDTH - 250 + i*80, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, col, 1)
|
|
|
|
def run(self):
|
|
try:
|
|
self.last_update_time = time.time()
|
|
while self.is_running:
|
|
fr = self.tello.get_frame_read()
|
|
if fr.frame is None: continue
|
|
frame = cv2.resize(fr.frame.copy(), (WINDOW_WIDTH, WINDOW_HEIGHT))
|
|
self.update_telemetry(frame)
|
|
|
|
# Face Detection (auf Originalbild)
|
|
faces = self.face_cascade.detectMultiScale(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), 1.2, 8)
|
|
|
|
key = cv2.waitKey(1) & 0xFF
|
|
if key == ord('q'): self.is_running = False
|
|
elif key == ord('m'): self.is_manual = not self.is_manual
|
|
elif key == ord('o'): self.is_orbiting = not self.is_orbiting
|
|
elif key == ord('t'):
|
|
print("[App] Manual Takeoff Triggered")
|
|
self.tello.takeoff()
|
|
elif key == ord('l'): self.tello.land()
|
|
|
|
if self.is_manual:
|
|
s = 60
|
|
self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0
|
|
if key == ord('w'): self.m_fb = s
|
|
elif key == ord('s'): self.m_fb = -s
|
|
elif key == ord('a'): self.m_lr = -s
|
|
elif key == ord('d'): self.m_lr = s
|
|
elif key == ord('r'): self.m_ud = s # R = UP
|
|
elif key == ord('f'): self.m_ud = -s # F = DOWN
|
|
elif key == ord('e'): self.m_yv = s # E = YAW RIGHT
|
|
elif key == ord('z'): self.m_yv = -s # Z = YAW LEFT
|
|
|
|
self.process_flight(faces)
|
|
try: bat = int(self.tello.get_battery())
|
|
except: bat = 0
|
|
|
|
self.draw_hud(frame, bat)
|
|
cv2.imshow("Tello AI Pilot (Stable Mode)", frame)
|
|
except Exception as e: print(f"Error: {e}")
|
|
finally:
|
|
self.tello.land(); self.tello.end(); cv2.destroyAllWindows()
|
|
|
|
if __name__ == "__main__":
|
|
app = FaceTrackingApp()
|
|
app.run()
|