263 lines
9.6 KiB
Python
263 lines
9.6 KiB
Python
from dataclasses import dataclass
|
|
import logging
|
|
import subprocess
|
|
import platform
|
|
import sys
|
|
import time
|
|
import socket
|
|
import cv2
|
|
import os
|
|
import numpy as np
|
|
|
|
@dataclass
|
|
class BackgroundFrameRead():
|
|
frame: cv2.typing.MatLike
|
|
|
|
class TelloSimClient:
|
|
def __init__(self, host='localhost', port=9999, auto_start_simulation=True):
|
|
self.host = host
|
|
self.port = port
|
|
self.latest_frame = None
|
|
if auto_start_simulation and not self._check_simulation_running():
|
|
self._start_simulation()
|
|
print("[Wrapper] Starting Tello Simulation...")
|
|
self._wait_for_simulation()
|
|
|
|
def _start_simulation(self):
|
|
sim_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tello_drone_sim.py'))
|
|
if platform.system() == "Windows":
|
|
command = f'start cmd /k python "{sim_path}"'
|
|
print("[DEBUG] Launching simulation command:", command)
|
|
subprocess.Popen(command, shell=True)
|
|
elif platform.system() == "Linux":
|
|
subprocess.Popen(['gnome-terminal', '--', 'python3', 'tello_drone_sim.py'])
|
|
elif platform.system() == "Darwin":
|
|
subprocess.Popen(['ls'])
|
|
subprocess.Popen(['pwd'])
|
|
python_path = os.path.join(os.path.dirname(sys.executable), 'python3')
|
|
print("Running python3 from path:", python_path)
|
|
subprocess.Popen([python_path, './tello_drone_sim.py'], cwd=os.getcwd(),
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
start_new_session=True)
|
|
else:
|
|
raise OSError("Unsupported OS for launching terminal simulation.")
|
|
|
|
def _check_simulation_running(self):
|
|
try:
|
|
with socket.create_connection((self.host, self.port), timeout=1):
|
|
return True
|
|
except (ConnectionRefusedError, socket.timeout, OSError) as ex:
|
|
logging.error("[Wrapper] Simulation is not running.", ex)
|
|
return False
|
|
|
|
def _wait_for_simulation(self, timeout=30):
|
|
print("[Wrapper] Waiting for simulation to become ready...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
if self._check_simulation_running():
|
|
print("[Wrapper] Simulation is now ready!")
|
|
return
|
|
time.sleep(1)
|
|
raise TimeoutError("[Error] Simulation did not become ready in time.")
|
|
|
|
def _send_command(self, command: str):
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.connect((self.host, self.port))
|
|
s.send(command.encode())
|
|
except ConnectionRefusedError:
|
|
print(f"[Error] Unable to connect to the simulation at {self.host}:{self.port}")
|
|
|
|
def get_frame_read(self) -> BackgroundFrameRead:
|
|
"""Get the latest frame directly from the simulator over TCP."""
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.connect((self.host, self.port))
|
|
s.send(b'get_latest_frame')
|
|
|
|
# Receive frame size (4 bytes)
|
|
size_data = s.recv(4)
|
|
if len(size_data) != 4:
|
|
print("[Error] Failed to receive frame size")
|
|
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
|
|
|
|
frame_size = int.from_bytes(size_data, byteorder='big')
|
|
|
|
# If size is 0, no frame available
|
|
if frame_size == 0:
|
|
print("[Debug] No frame available from simulator")
|
|
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
|
|
|
|
# Receive the frame data
|
|
frame_data = b''
|
|
bytes_received = 0
|
|
while bytes_received < frame_size:
|
|
chunk = s.recv(min(4096, frame_size - bytes_received))
|
|
if not chunk:
|
|
break
|
|
frame_data += chunk
|
|
bytes_received += len(chunk)
|
|
|
|
# Decode the frame from PNG bytes
|
|
if len(frame_data) == frame_size:
|
|
nparr = np.frombuffer(frame_data, np.uint8)
|
|
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
if image is not None:
|
|
# Return frame in BGR format (OpenCV's native format)
|
|
# Users should convert to RGB if needed for display
|
|
return BackgroundFrameRead(frame=image)
|
|
|
|
print("[Error] Failed to decode frame data")
|
|
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
|
|
|
|
except ConnectionRefusedError:
|
|
print(f"[Error] Unable to connect to the simulation at {self.host}:{self.port}")
|
|
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
|
|
except Exception as e:
|
|
print(f"[Error] Failed to get frame: {e}")
|
|
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
|
|
|
|
def _request_data(self, command):
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.connect((self.host, self.port))
|
|
s.send(command.encode())
|
|
return s.recv(4096).decode()
|
|
except ConnectionRefusedError:
|
|
print(f"[Error] Unable to retrieve '{command}' from {self.host}:{self.port}")
|
|
return "N/A"
|
|
|
|
def wait_until_motion_complete(self):
|
|
while self._request_data("get_is_moving") == "True":
|
|
time.sleep(0.1)
|
|
|
|
def get_battery(self):
|
|
return self._request_data('get_battery')
|
|
|
|
def get_distance_tof(self):
|
|
return self._request_data('get_distance_tof')
|
|
|
|
def get_height(self):
|
|
return self._request_data('get_height')
|
|
|
|
def get_flight_time(self):
|
|
return self._request_data('get_flight_time')
|
|
|
|
def get_speed_x(self):
|
|
return self._request_data('get_speed_x')
|
|
|
|
def get_speed_y(self):
|
|
return self._request_data('get_speed_y')
|
|
|
|
def get_speed_z(self):
|
|
return self._request_data('get_speed_z')
|
|
|
|
def get_acceleration_x(self):
|
|
return self._request_data('get_acceleration_x')
|
|
|
|
def get_acceleration_y(self):
|
|
return self._request_data('get_acceleration_y')
|
|
|
|
def get_acceleration_z(self):
|
|
return self._request_data('get_acceleration_z')
|
|
|
|
def get_pitch(self):
|
|
return self._request_data('get_pitch')
|
|
|
|
def get_roll(self):
|
|
return self._request_data('get_roll')
|
|
|
|
def get_yaw(self):
|
|
return self._request_data('get_yaw')
|
|
|
|
def query_attitude(self):
|
|
return self._request_data('query_attitude')
|
|
|
|
def get_current_state(self):
|
|
return self._request_data('get_current_state')
|
|
|
|
def connect(self):
|
|
self._send_command('connect')
|
|
|
|
def takeoff(self):
|
|
self._send_command('takeoff')
|
|
|
|
def land(self):
|
|
self._send_command('land')
|
|
|
|
def rotate_clockwise(self, degrees):
|
|
self._send_command(f'rotate_cw {degrees}')
|
|
|
|
def rotate_counter_clockwise(self, degrees):
|
|
self._send_command(f'rotate_ccw {degrees}')
|
|
|
|
def streamon(self):
|
|
self._send_command('streamon')
|
|
|
|
def streamoff(self):
|
|
self._send_command('streamoff')
|
|
|
|
def capture_frame(self):
|
|
self._send_command('capture_frame')
|
|
|
|
def emergency(self):
|
|
self._send_command('emergency')
|
|
|
|
def move_forward(self, distance):
|
|
self._send_command(f'forward {distance}')
|
|
|
|
def move_back(self, distance):
|
|
self._send_command(f'backward {distance}')
|
|
|
|
def move_left(self, distance):
|
|
self._send_command(f'left {distance}')
|
|
|
|
def move_right(self, distance):
|
|
self._send_command(f'right {distance}')
|
|
|
|
def move_up(self, distance):
|
|
self._send_command(f'up {distance}')
|
|
|
|
def move_down(self, distance):
|
|
self._send_command(f'down {distance}')
|
|
|
|
def flip_left(self):
|
|
self._send_command('flip_left')
|
|
|
|
def flip_right(self):
|
|
self._send_command('flip_right')
|
|
|
|
def flip_forward(self):
|
|
self._send_command('flip_forward')
|
|
|
|
def flip_back(self):
|
|
self._send_command('flip_back')
|
|
|
|
def go_xyz_speed(self, x, y, z, speed):
|
|
self._send_command(f"go {x} {y} {z} {speed}")
|
|
|
|
def curve_xyz_speed(self, x1, y1, z1, x2, y2, z2, speed):
|
|
self._send_command(f"curve {x1} {y1} {z1} {x2} {y2} {z2} {speed}")
|
|
|
|
def set_speed(self, speed):
|
|
self._send_command(f"set_speed {speed}")
|
|
|
|
def send_rc_control(self, left_right_velocity, forward_backward_velocity, up_down_velocity, yaw_velocity):
|
|
self._send_command(f"send_rc_control {left_right_velocity} {forward_backward_velocity} {up_down_velocity} {yaw_velocity}")
|
|
|
|
|
|
def end(self):
|
|
self._send_command('end')
|
|
|
|
def get_info(self):
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.connect((self.host, self.port))
|
|
s.send(b'get_info')
|
|
return s.recv(4096).decode()
|
|
except ConnectionRefusedError:
|
|
print(f"[Error] Unable to retrieve info from {self.host}:{self.port}")
|
|
return "{}"
|
|
|
|
def initiate_throw_takeoff(self):
|
|
self._send_command('throw_takeoff') |