Files
Sim-Tello-KI-fun/tello_sim_client.py
Sebastian Unterschütz 8c77744cad add KI
2026-04-21 10:58:12 +02:00

149 lines
5.4 KiB
Python

from dataclasses import dataclass
import subprocess
import platform
import sys
import time
import socket
import cv2
import os
import numpy as np
try:
from djitellopy import Tello
HAS_TELLO_LIB = True
except ImportError:
HAS_TELLO_LIB = False
@dataclass
class BackgroundFrameRead():
frame: cv2.typing.MatLike
class TelloSimClient:
def __init__(self, host='127.0.0.1', port=9999, auto_start_simulation=True, use_real_tello=False):
self.host = host
self.port = port
self.use_real_tello = use_real_tello
self.real_drone = None
if self.use_real_tello:
if HAS_TELLO_LIB:
print("[Client] Connecting to REAL Tello Hardware...")
self.real_drone = Tello()
else:
print("[Error] djitellopy not found! Falling back to Simulator.")
self.use_real_tello = False
if not self.use_real_tello:
if auto_start_simulation and not self._check_simulation_running():
self._start_simulation()
self._wait_for_simulation()
def connect(self):
if self.real_drone: self.real_drone.connect()
else: self._send_command('connect')
def takeoff(self):
if self.real_drone: self.real_drone.takeoff()
else: self._send_command('takeoff')
def land(self):
if self.real_drone: self.real_drone.land()
else: self._send_command('land')
def streamon(self):
if self.real_drone: self.real_drone.streamon()
else: self._send_command('streamon')
def streamoff(self):
if self.real_drone: self.real_drone.streamoff()
else: self._send_command('streamoff')
def get_frame_read(self):
if self.real_drone: return self.real_drone.get_frame_read()
return self # Simulator acts as frame provider via TCP in get_frame_read-style
@property
def frame(self):
"""Helper for simulator to match BackgroundFrameRead structure"""
return self._receive_tcp_frame()
def send_rc_control(self, lr, fb, ud, yaw):
if self.real_drone: self.real_drone.send_rc_control(int(lr), int(fb), int(ud), int(yaw))
else: self._send_command(f"send_rc_control {lr} {fb} {ud} {yaw}")
def get_yaw(self):
if self.real_drone: return self.real_drone.get_yaw()
return float(self._request_data('get_yaw') or 0)
def get_height(self):
if self.real_drone: return self.real_drone.get_height()
return float(self._request_data('get_height') or 0)
def get_distance_tof(self):
if self.real_drone: return self.real_drone.get_distance_tof()
return float(self._request_data('get_distance_tof') or 0)
def get_battery(self):
if self.real_drone: return self.real_drone.get_battery()
return int(self._request_data('get_battery') or 0)
def end(self):
if self.real_drone:
self.real_drone.streamoff()
else:
self._send_command('end')
# --- Internal Simulator Logic ---
def _start_simulation(self):
sim_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tello_sim', 'tello_drone_sim.py'))
print(f"[Wrapper] Launching Simulator: {sim_path}")
clean_env = os.environ.copy()
# Set Qt platform to xcb for better compatibility on Linux/Wayland
clean_env["QT_QPA_PLATFORM"] = "xcb"
# Clean PyCharm debugger vars
for k in list(clean_env.keys()):
if k.startswith('PYDEVD'): del clean_env[k]
subprocess.Popen([sys.executable, sim_path], cwd=os.path.dirname(sim_path),
env=clean_env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
def _check_simulation_running(self):
try:
with socket.create_connection((self.host, self.port), timeout=0.5): return True
except: return False
def _wait_for_simulation(self, timeout=15):
start = time.time()
while time.time() - start < timeout:
if self._check_simulation_running(): return
time.sleep(1)
print("[Warning] Simulator not responding.")
def _send_command(self, cmd):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port)); s.send(cmd.encode())
except: pass
def _request_data(self, cmd):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port)); s.send(cmd.encode())
return s.recv(1024).decode()
except: return None
def _receive_tcp_frame(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(b'get_latest_frame')
size_data = s.recv(4)
if not size_data: return None
size = int.from_bytes(size_data, 'big')
data = b''
while len(data) < size:
chunk = s.recv(min(size - len(data), 4096))
if not chunk: break
data += chunk
return cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
except: return None