Compare commits

..

23 Commits

Author SHA1 Message Date
Sebastian Unterschütz
e72ba2989e add models 2026-04-21 15:45:43 +02:00
Sebastian Unterschütz
f5d344cf75 add KI 2026-04-21 11:35:04 +02:00
Sebastian Unterschütz
8c77744cad add KI 2026-04-21 10:58:12 +02:00
Sebastian Unterschütz
37f98ccb86 add own stuff 2026-04-15 21:33:01 +02:00
Andrew Johnson
3aadf562fd Add Apache License 2.0
Added the Apache License 2.0 to the project.
2025-11-23 14:20:29 +01:00
Andrew Johnson
f6035231c0 Move the client out of the simulator 2025-11-22 09:44:34 +01:00
Andrew Johnson
d100f7e820 Add space 2025-11-15 14:02:10 +01:00
Andrew Johnson
944b225a81 Remove collider none 2025-11-15 12:28:47 +01:00
Andrew Johnson
f22a171c80 Add conda as a reference 2025-11-15 12:08:52 +01:00
Andrew Johnson
e5b36e4e96 Add extra trusted domain 2025-11-15 12:05:02 +01:00
Andrew Johnson
bfa9653bc8 Add more docs on venv 2025-11-15 11:45:29 +01:00
Andrew Johnson
63367bab8c Lock in python package versions 2025-11-15 11:11:19 +01:00
Andrew Johnson
3f6049c355 Clean code 2025-11-09 13:38:42 +01:00
Andrew Johnson
6503a5c5f4 Merge commit 'edebc795d9df8936dd9311b06bdb06de0fe5d9cb' 2025-11-09 13:31:30 +01:00
Andrew Johnson
edebc795d9 Update examples/5_take_a_picture.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-11-09 13:31:08 +01:00
Andrew Johnson
a10c1b5bc5 Merge remote-tracking branch 'origin/copilot/sub-pr-2' into andrew/add-dynamic-dev-container-config 2025-11-09 13:30:50 +01:00
copilot-swe-agent[bot]
3e10e3e621 Fix cross-platform errno compatibility
Co-authored-by: anjrew <26453863+anjrew@users.noreply.github.com>
2025-11-09 12:30:13 +00:00
copilot-swe-agent[bot]
3f387756ab Initial plan 2025-11-09 12:27:42 +00:00
Andrew Johnson
fb44f33bea Update to socket based solution 2025-11-09 13:22:41 +01:00
Andrew Johnson
73454a03c8 Remove files on disk 2025-11-09 13:10:05 +01:00
Andrew Johnson
3cac34355f Add sim recording files 2025-11-09 13:00:41 +01:00
Andrew Johnson
a513489835 Add socket cleanup 2025-11-09 12:52:49 +01:00
Andrew Johnson
55b42d9ece Clean up docs 2025-11-09 12:29:34 +01:00
41 changed files with 2890 additions and 360 deletions

1
.gitignore vendored
View File

@@ -87,3 +87,4 @@ cython_debug/
.devcontainer/devcontainer.json
output/
artifacts/

11
.vscode/launch.json vendored
View File

@@ -4,6 +4,17 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Tello Simulator",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/tello_sim/run_sim.py",
"console": "integratedTerminal",
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
},
{
"name": "Python Debugger: Current File",
"type": "debugpy",

201
LICENSE Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

112
README.md
View File

@@ -1,72 +1,72 @@
# Tello Drone Sim
# Tello AI Pilot (Extreme Performance Edition)
This is a simple simulation of a Tello drone using Ursina. The drone can be controlled via tcp calls.
Dieses Projekt ist eine hochoptimierte KI-Steuerung für die DJI Tello Drohne (und den integrierten Simulator). Es nutzt modernste Computer-Vision-Modelle via ONNX Runtime, um Personen zu verfolgen, Hindernissen auszuweichen und Ziele intelligent wiederzuerkennen.
In the repo there is the simulation server along with a client class that can be used to interact with the sim server
## 🚀 Hauptmerkmale
- **Hybrid AI Engine**: Nutzt **YuNet** (Face Detection), **MiDaS** (Depth Estimation) und **MobileNetV3** (ReID) alle via **ONNX Runtime** für maximale FPS.
- **Sport-Modus**: Aggressives 3-Achsen-Tracking für schnelle Verfolgungsjagden.
- **Visual Fingerprinting (ReID)**: Erkennt fixierte Personen wieder, auch wenn sie das Bild kurz verlassen.
- **Multithreading**: Flüssiges Kamerabild durch Trennung von Video-Stream und KI-Logik.
- **Simulator inklusive**: Eine integrierte Ursina-Umgebung zum gefahrlosen Testen.
![Features](./images/Features.gif)
## Setup
### Option 1: Dev Container (Recommended)
The easiest way to get started is using the provided dev container which includes all dependencies and GUI support:
1. **Setup the dev container for your platform:**
```bash
.devcontainer/setup.sh
```
This will auto-detect your platform (macOS, Linux, Windows, WSL) and generate the appropriate `devcontainer.json`.
2. **Open in VS Code:**
- Install the "Dev Containers" extension
- Open Command Palette (Cmd/Ctrl + Shift + P)
- Run "Dev Containers: Reopen in Container"
3. **Platform-specific requirements:**
- **macOS**: Install XQuartz (`brew install --cask xquartz`) and run `xhost +localhost`
- **Linux**: X11 forwarding should work out of the box
- **Windows**: Access GUI via VNC at `http://localhost:5901` (password: `vncpass`)
### Option 2: Manual Setup
If you prefer to set up the environment manually:
1. Create the virtual environment by running:
```bash
python3 -m venv venv
```
2. Activate the virtual environment by running:
```bash
source venv/bin/activate
```
3. Install the required packages by running:
## 🛠 Installation
1. **Python-Umgebung**: Empfohlen wird Python 3.10 bis 3.12.
2. **Abhängigkeiten installieren**:
```bash
pip install -r requirements.txt
```
3. **Modelle prüfen**: Stelle sicher, dass im Ordner `models/` folgende Dateien liegen:
- `face_detection_yunet.onnx`
- `midas_small.onnx`
- `reid_mobilenet.onnx`
4. Export the python path by running:
## 🚀 Starten der Drohne
### A) Im Simulator (Standard)
1. Öffne die Datei `run.py`.
2. Stelle sicher, dass `use_real_tello=False` eingestellt ist.
3. Starte das Programm:
```bash
export PYTHONPATH=$PWD
python run.py
```
4. Das Simulator-Fenster und das KI-Pilot-Fenster öffnen sich automatisch.
### B) Mit der echten DJI Tello
1. Schalte deine Tello ein und verbinde deinen PC mit dem WLAN der Drohne (z.B. `TELLO-XXXXXX`).
2. Öffne die Datei `run.py` und ändere die Zeile zu:
```python
app = FaceTrackingApp(use_real_tello=True)
```
3. Starte das Programm:
```bash
python run.py
```
## 🕹 Bedienung im Flug
## Running the simulation
Sobald das Video-Fenster erscheint:
1. **Klicke mit der Maus** in das Fenster, um den Fokus zu aktivieren.
2. Drücke **'T'** zum Starten (Takeoff).
3. **Tracking aktivieren**: Klicke mit der Maus auf ein erkanntes Gesicht/Person oder drücke **'K'**, um den nächsten erkannten Kopf zu fixieren (Lock-ON).
To run the simulation, run the following command:
### Wichtige Tasten:
| Taste | Aktion |
| :--- | :--- |
| **T** | **Takeoff** (Abheben) |
| **L** | **Land** (Landen) |
| **2** | **Sport-Modus** (An/Aus) - Schnelleres Tracking auf allen Achsen |
| **M** | **Manueller Modus** (KI stoppt die Steuerung) |
| **Space**| **Not-Aus** (Motoren sofort aus) |
| **W/S/A/D**| Vorwärts, Rückwärts, Links, Rechts (Manuell) |
| **R/F** | Steigen / Sinken (Manuell) |
| **E/Z** | Drehen Links / Rechts (Manuell) |
| **Enter**| Programm sicher beenden |
```bash
python tello_sim/run_sim.py
```
## 🏗 Architektur & Performance
- **AI-Worker Thread**: Verarbeitet alle Modelle parallel zum Haupt-Thread.
- **Rate Limiting**: RC-Befehle werden mit 10Hz gesendet, um das SDK-Protokoll nicht zu überlasten.
- **ONNX Acceleration**: Nutzt CPU-Optimierungen, die bis zu 5x schneller als Standard-PyTorch sind.
You can try running some of the [examples](./examples) to see how the simulation works. The examples are located in the `examples` folder.
Or use the [client](./tello_sim/tello_sim_client.py) class to interact with the simulation server. The client class is located in the `tello_sim` folder.
---
*Viel Spaß beim Fliegen! Achte bei der echten Drohne immer auf genügend Platz und einen vollen Akku (>15%).*

View File

@@ -0,0 +1 @@
<?xml version='1.0' encoding='UTF-8'?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Details>No such object: mediapipe-models/face_detector/blaze_face_long_range/float16/latest/blaze_face_long_range.tflite</Details></Error>

1
drone_pilot/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Init for drone_pilot package

35
drone_pilot/config.py Normal file
View File

@@ -0,0 +1,35 @@
# drone_pilot/config.py
class Config:
WIN_NAME = "Tello AI Pilot v2.0 (Extreme Speed)"
WIDTH, HEIGHT = 1024, 720
TARGET_ALTITUDE = 1.5
TARGET_FACE_SIZE = 180
TARGET_PERSON_SIZE = 400
ALT_THRESHOLD = 0.12
# Normal Mode Gains
YAW_GAIN = 0.12
FORWARD_GAIN = 1.5
ALT_GAIN = 40
# Sport Mode Gains (Much more aggressive)
SPORT_YAW_GAIN = 0.25
SPORT_FB_GAIN = 2.0
SPORT_LR_GAIN = 0.6
DEPTH_THRESHOLD = 0.90
OBSTACLE_TOF_CM = 70
FACE_DEADZONE = 20
FACE_ROT_ONLY = 80
PERSON_CONF_THRESHOLD = 0.5
SMOOTHING_ALPHA = 0.35
class Colors:
GREEN = (0, 255, 0)
RED = (0, 0, 255)
BLUE = (255, 0, 0)
PURPLE = (255, 0, 255)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
HUD_BG = (10, 10, 10)

133
drone_pilot/flight.py Normal file
View File

@@ -0,0 +1,133 @@
import time
import numpy as np
from typing import List, Tuple, Dict
from .config import Config
class FlightController:
"""
Verantwortlich für die Berechnung der Flugvektoren basierend auf KI-Ergebnissen.
Unterstützt Normal-Modus (sequenziell) und Sport-Modus (simultan/LR).
"""
def __init__(self):
self.last_sent_rc = [0, 0, 0, 0]
self.smooth_face = None
self.search_start = time.time()
self.status = "INITIALIZING"
# Speicher für verloren gegangene Ziele
self.last_target_side = 0
self.lost_time = 0
def calculate(self,
faces: List[Tuple],
is_manual: bool,
is_sport: bool, # NEU: Sport-Modus Flag
emergency_stop: bool,
is_locked: bool,
locked_person: Tuple,
current_height: float,
target_altitude: float,
tof: int,
zones: Dict[str, bool],
zone_scores: Dict[str, float],
manual_rc: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int, int], str]:
lr, fb, ud, yv = 0, 0, 0, 0
if len(faces) > 0:
target = max(faces, key=lambda f: f[2] * f[3])
if self.smooth_face is None: self.smooth_face = target
else:
self.smooth_face = tuple(int(self.smooth_face[i]*0.8 + target[i]*0.2) for i in range(4))
else:
self.smooth_face = None
if emergency_stop:
self.status = "EMERGENCY STOP"
return (0, 0, 0, 0), self.status
# Hindernisvermeidung
center_blocked = zones["CENTER"] or tof < Config.OBSTACLE_TOF_CM
if center_blocked:
self.status = "AVOIDING OBSTACLE"
yv = 80 if zone_scores["LEFT"] < zone_scores["RIGHT"] else -80
fb = -30
return self._smooth(0, fb, 0, yv)
if is_manual:
self.status = "MANUAL CONTROL"
lr, fb, m_ud, yv = manual_rc
if abs(m_ud) > 0: ud = m_ud
return self._smooth(lr, fb, ud, yv)
# AI LOGIC
if is_locked:
if locked_person is not None:
self.search_start = time.time()
self.lost_time = 0
(x, y, w, h) = locked_person
center_x = x + w // 2
err_x = center_x - (Config.WIDTH // 2)
self.last_target_side = 1 if err_x > 0 else -1
if is_sport:
# SPORT MODUS: Alles gleichzeitig + LR-Strafing
yv = int(np.clip(Config.SPORT_YAW_GAIN * err_x, -100, 100))
fb = int(np.clip(Config.SPORT_FB_GAIN * (Config.TARGET_PERSON_SIZE - w), -100, 100))
lr = int(np.clip(Config.SPORT_LR_GAIN * err_x, -60, 60))
self.status = "SPORT PURSUIT: FULL AXIS"
else:
# NORMAL MODUS: Sequenziell (Drehen ODER Fliegen)
if abs(err_x) > Config.FACE_DEADZONE:
yv = int(np.clip(Config.YAW_GAIN * err_x, -50, 50))
fb = 0
self.status = "PURSUIT: AIMING"
else:
yv = 0
fb = int(np.clip(Config.FORWARD_GAIN * (Config.TARGET_PERSON_SIZE - w), -80, 80))
self.status = "PURSUIT: APPROACHING"
else:
# Target verloren
if self.lost_time == 0: self.lost_time = time.time()
elapsed = time.time() - self.lost_time
search_speed = 60 if is_sport else 40
if elapsed < 10.0:
yv = search_speed * self.last_target_side
self.status = f"LOST TARGET: SCANNING {'RIGHT' if self.last_target_side > 0 else 'LEFT'}"
else:
self.status = "TARGET LOST: PATROL"
yv = 30
elif self.smooth_face is not None:
(x, y, w, h) = self.smooth_face
err_x = (x + w // 2) - (Config.WIDTH // 2)
yv = int(np.clip(Config.YAW_GAIN * err_x, -40, 40))
self.status = "AWAITING LOCK"
else:
# Patrouille
elapsed = (time.time() - self.search_start) % 8.0
if elapsed < 3.0:
self.status = "PATROL: ADVANCE"
fb = 35
else:
self.status = "PATROL: SCAN"
yv = 35
return self._smooth(lr, fb, ud, yv)
def _smooth(self, lr, fb, ud, yv):
alpha = Config.SMOOTHING_ALPHA
slr = int(self.last_sent_rc[0] * (1-alpha) + lr * alpha)
sfb = int(self.last_sent_rc[1] * (1-alpha) + fb * alpha)
sud = int(self.last_sent_rc[2] * (1-alpha) + ud * alpha)
syv = int(self.last_sent_rc[3] * (1-alpha) + yv * alpha)
if abs(slr) < 3: slr = 0
if abs(sfb) < 3: sfb = 0
if abs(sud) < 3: sud = 0
if abs(syv) < 3: syv = 0
self.last_sent_rc = [slr, sfb, sud, syv]
return (slr, sfb, sud, syv), self.status

303
drone_pilot/main.py Normal file
View File

@@ -0,0 +1,303 @@
import cv2
import numpy as np
import threading
import time
import traceback
from tello_sim_client import TelloSimClient
from .config import Config, Colors
from .vision import DroneVision
from .flight import FlightController
from .ui import HUD
class FaceTrackingApp:
def __init__(self, use_real_tello: bool = False):
print(f"\n[System] Initializing Tello AI Pilot (Real Tello: {use_real_tello})")
self.tello = TelloSimClient(use_real_tello=use_real_tello)
self.tello.connect()
self.tello.streamon()
self.vision = DroneVision()
self.flight_controller = FlightController()
# State Management
self.is_running = True
self.is_manual = True
self.is_sport = False # NEW: Sport Mode flag
self.is_locked = False
self.is_taking_off = False
self.is_flying = False
self.takeoff_error = False
self.locked_person = None
self.locked_person_features = None
self.lock_trigger = False
self.emergency_stop = False
# Manual Override States
self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0
self.is_rotating = False
self.current_height = 0.0
self.current_yaw = 0.0
self.target_altitude = Config.TARGET_ALTITUDE
self.last_rc_time = 0.0
self._last_heartbeat = 0.0
self._prev_rc = [0, 0, 0, 0]
# AI Threading
self.ai_lock = threading.Lock()
self.latest_frame = None
self.ai_results = {
"faces": [],
"persons": [],
"depth_map_vis": None,
"zones": {"LEFT": False, "CENTER": False, "RIGHT": False},
"zone_scores": {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0},
"reid_target_box": None
}
cv2.namedWindow(Config.WIN_NAME, cv2.WINDOW_NORMAL)
cv2.resizeWindow(Config.WIN_NAME, Config.WIDTH, Config.HEIGHT)
cv2.setMouseCallback(Config.WIN_NAME, self._on_mouse)
def _on_mouse(self, event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
with self.ai_lock:
faces = self.ai_results["faces"]
persons = self.ai_results["persons"]
frame = self.latest_frame.copy() if self.latest_frame is not None else None
clicked_target = None
for pbox in persons:
px, py, pw, ph = pbox
if px < x < px+pw and py < y < py+ph:
clicked_target = pbox; break
if not clicked_target:
for fbox in faces:
fx, fy, fw, fh = fbox
if fx < x < fx+fw and fy < y < fy+fh:
clicked_target = fbox; break
if clicked_target and frame is not None:
feat = self.vision.extract_person_features(frame, clicked_target)
if feat is not None:
print(f"[System] Manual Lock-ON at ({x}, {y})")
self.locked_person = clicked_target
self.locked_person_features = feat
self.is_locked = True
self.lock_trigger = False
self.is_manual = False
else:
print("[System] Manual Unlock")
self.is_locked = False
self.locked_person = None
self.locked_person_features = None
def update_telemetry(self):
try:
self.current_yaw = float(self.tello.get_yaw())
raw_h = float(self.tello.get_height())
self.current_height = raw_h if abs(raw_h) < 10 else raw_h / 100.0
except: pass
def ai_worker(self):
while self.is_running:
try:
frame_to_process = None
with self.ai_lock:
if self.latest_frame is not None:
frame_to_process = self.latest_frame.copy()
if frame_to_process is not None:
faces = self.vision.detect_faces(frame_to_process)
persons = self.vision.detect_persons(frame_to_process)
self.vision.estimate_depth_and_radar(frame_to_process)
reid_match = None
if self.is_locked and self.locked_person_features is not None:
max_sim = 0
for pbox in persons:
feat = self.vision.extract_person_features(frame_to_process, pbox)
sim = self.vision.compare_features(self.locked_person_features, feat)
if sim > max_sim:
max_sim = sim
reid_match = pbox
if max_sim > 0.75:
new_feat = self.vision.extract_person_features(frame_to_process, reid_match)
if new_feat is not None:
self.locked_person_features = self.locked_person_features * 0.95 + new_feat * 0.05
else:
reid_match = None
with self.ai_lock:
self.ai_results["faces"] = faces
self.ai_results["persons"] = persons
self.ai_results["depth_map_vis"] = self.vision.depth_map_vis
self.ai_results["zones"] = self.vision.zones.copy()
self.ai_results["zone_scores"] = self.vision.zone_scores.copy()
self.ai_results["reid_target_box"] = reid_match
time.sleep(0.01)
except Exception:
time.sleep(1)
def run(self):
ai_thread = threading.Thread(target=self.ai_worker, daemon=True)
ai_thread.start()
try:
while self.is_running:
fr = self.tello.get_frame_read()
if fr is None or fr.frame is None:
time.sleep(0.01); continue
frame = cv2.resize(fr.frame.copy(), (Config.WIDTH, Config.HEIGHT))
with self.ai_lock:
self.latest_frame = frame.copy()
self.update_telemetry()
with self.ai_lock:
faces = self.ai_results["faces"]
persons = self.ai_results["persons"]
depth_map_vis = self.ai_results["depth_map_vis"]
zones = self.ai_results["zones"]
zone_scores = self.ai_results["zone_scores"]
reid_box = self.ai_results["reid_target_box"]
if self.is_locked and reid_box is not None:
self.locked_person = reid_box
if self.lock_trigger and len(faces) > 0 and len(persons) > 0:
(fx, fy, fw, fh) = max(faces, key=lambda f: f[2]*f[3])
fcx, fcy = fx + fw//2, fy + fh//2
for pbox in persons:
px, py, pw, ph = pbox
if px < fcx < px+pw and py < fcy < py+ph:
feat = self.vision.extract_person_features(frame, pbox)
if feat is not None:
self.locked_person = pbox
self.locked_person_features = feat
self.is_locked = True
self.lock_trigger = False
self.is_manual = False
break
# UI Graphics
if self.is_locked and self.locked_person:
(x,y,w,h) = self.locked_person
cv2.rectangle(frame, (x,y), (x+w,y+h), Colors.BLUE, 3)
cv2.putText(frame, "TARGET LOCKED", (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, Colors.BLUE, 2)
elif self.flight_controller.smooth_face is not None:
(x,y,w,h) = self.flight_controller.smooth_face
cv2.rectangle(frame, (x,y), (x+w,y+h), Colors.GREEN, 2)
cv2.putText(frame, "FACE FOUND", (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, Colors.GREEN, 2)
key = cv2.waitKey(1) & 0xFF
self._handle_input(key)
if self.is_flying and not self.is_taking_off and not self.emergency_stop:
try: tof = int(self.tello.get_distance_tof())
except: tof = 1000
active_manual_rc = (self.m_lr, self.m_fb, self.m_ud, 60 if self.is_rotating else self.m_yv)
rc, status = self.flight_controller.calculate(
faces=faces,
is_manual=self.is_manual,
is_sport=self.is_sport, # NEW
emergency_stop=self.emergency_stop,
is_locked=self.is_locked,
locked_person=self.locked_person,
current_height=self.current_height,
target_altitude=0.0,
tof=tof,
zones=zones,
zone_scores=zone_scores,
manual_rc=active_manual_rc
)
if self.is_sport: status = "SPORT MODE: ACTIVE"
now = time.time()
if now - self.last_rc_time >= 0.1:
changed = any(abs(rc[i] - self._prev_rc[i]) > 1 for i in range(4))
heartbeat = now - self._last_heartbeat >= 0.5
if changed or heartbeat:
self.tello.send_rc_control(*rc)
self._prev_rc = list(rc)
self._last_heartbeat = now
self.last_rc_time = now
else:
if self.is_taking_off: status = "TAKING OFF..."
elif self.takeoff_error: status = "TAKEOFF ERROR (LOW BAT?)"
elif self.emergency_stop: status = "STOPPED"
else: status = "READY / ON GROUND"
try: bat = int(self.tello.get_battery())
except: bat = 0
HUD.draw(frame, status, self.emergency_stop, self.is_locked, self.lock_trigger,
self.current_height, 0.0, self.current_yaw, bat, depth_map_vis, zones)
cv2.imshow(Config.WIN_NAME, frame)
except Exception:
traceback.print_exc()
finally:
self.is_running = False
try: self.tello.land()
except: pass
self.tello.end()
cv2.destroyAllWindows()
def _handle_takeoff(self):
if self.is_taking_off or self.is_flying: return
self.is_taking_off = True
self.takeoff_error = False
def _task():
try:
print("[Flight] Takeoff command sent to SDK...")
self.tello.takeoff()
time.sleep(3)
self.is_flying = True
print("[Flight] In Air. Logic active.")
except Exception as e:
print(f"[Flight Error] Takeoff failed: {e}")
self.takeoff_error = True
finally:
self.is_taking_off = False
threading.Thread(target=_task, daemon=True).start()
def _handle_input(self, key: int):
if key == 13: self.is_running = False
elif key == 32: self.emergency_stop = not self.emergency_stop
elif key == ord('m'): self.is_manual = not self.is_manual
elif key == ord('2'): self.is_sport = not self.is_sport # Toggle Sport Mode
elif key == ord('k'):
self.lock_trigger = not self.lock_trigger
self.is_locked = False
self.locked_person_features = None
elif key == ord('t'):
if not self.is_flying and not self.is_taking_off:
self.emergency_stop = False
self._handle_takeoff()
elif key == ord('l'):
self.is_flying = False
self.is_taking_off = False
try: self.tello.land()
except: pass
elif key == ord('1'): self.is_rotating = not self.is_rotating
self.m_lr, self.m_fb, self.m_ud, self.m_yv = 0, 0, 0, 0
if self.is_manual and not self.emergency_stop:
s = 100
if key == ord('w'): self.m_fb = s
elif key == ord('s'): self.m_fb = -s
elif key == ord('a'): self.m_lr = -s
elif key == ord('d'): self.m_lr = s
elif key == ord('r'): self.m_ud = s
elif key == ord('f'): self.m_ud = -s
elif key == ord('e'): self.m_yv = s
elif key == ord('z'): self.m_yv = -s

52
drone_pilot/ui.py Normal file
View File

@@ -0,0 +1,52 @@
import cv2
import numpy as np
from typing import Dict
from .config import Config, Colors
class HUD:
@staticmethod
def draw(frame: np.ndarray,
status: str,
emergency_stop: bool,
is_locked: bool,
lock_trigger: bool,
current_height: float,
target_altitude: float,
current_yaw: float,
bat: int,
depth_map_vis: np.ndarray,
zones: Dict[str, bool]):
cv2.rectangle(frame, (0, 0), (Config.WIDTH, 90), Colors.HUD_BG, -1)
cv2.rectangle(frame, (0, Config.HEIGHT-90), (Config.WIDTH, Config.HEIGHT), Colors.HUD_BG, -1)
stat_color = Colors.RED if "AVOIDING" in status or emergency_stop else Colors.GREEN
if is_locked: stat_color = Colors.BLUE
cv2.putText(frame, f"STATUS: {status}", (25, 40), cv2.FONT_HERSHEY_DUPLEX, 1.1, stat_color, 2)
cv2.putText(frame, f"ALT: {current_height:.1f}m | BAT: {bat}% | YAW: {int(current_yaw)}o", (25, 75), cv2.FONT_HERSHEY_SIMPLEX, 0.7, Colors.WHITE, 2)
if depth_map_vis is not None:
mini = cv2.resize(depth_map_vis, (200, 150))
frame[100:250, 25:225] = mini
cv2.rectangle(frame, (25, 100), (225, 250), Colors.WHITE, 2)
cv2.putText(frame, "AI DEPTH", (25, 275), cv2.FONT_HERSHEY_SIMPLEX, 0.6, Colors.WHITE, 2)
radar_x = Config.WIDTH - 220
for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]):
short_name = name[0]
active = zones[name]
col = Colors.RED if active else Colors.GREEN
cv2.rectangle(frame, (radar_x + i*70, 100), (radar_x + (i+1)*60, 150), col, -1 if active else 2)
cv2.putText(frame, short_name, (radar_x + 20 + i*70, 135), cv2.FONT_HERSHEY_DUPLEX, 0.8, Colors.WHITE, 2)
stop_col = Colors.RED if not emergency_stop else (0, 255, 255)
cv2.rectangle(frame, (Config.WIDTH - 250, Config.HEIGHT - 80), (Config.WIDTH - 25, Config.HEIGHT - 10), stop_col, -1)
cv2.putText(frame, "STOP", (Config.WIDTH - 200, Config.HEIGHT - 35), cv2.FONT_HERSHEY_DUPLEX, 0.8, Colors.WHITE, 2)
keys = ["T:Takeoff", "L:Land", "M:Manual", "K:LockOn", "R:Up", "F:Down", "1:Rotate", "Space:Stop", "ENTER:Quit"]
for i, k in enumerate(keys):
cv2.putText(frame, k, (25 + (i%5)*200, Config.HEIGHT - 55 + (i//5)*35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, Colors.WHITE, 2)
if lock_trigger:
cv2.putText(frame, "LOCK TRIGGER ACTIVE: FINDING FACE...", (25, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, Colors.BLUE, 2)

218
drone_pilot/vision.py Normal file
View File

@@ -0,0 +1,218 @@
import cv2
import torch
import numpy as np
import onnxruntime as ort
from PIL import Image
from typing import List, Tuple, Dict
from .config import Config
class DroneVision:
def __init__(self):
# 1. Face Detection: YuNet (ONNX) + Haar Cascade Fallback
self.face_detector = None
try:
self.face_detector = cv2.FaceDetectorYN.create(
"models/face_detection_yunet.onnx",
"",
(320, 320),
0.4,
0.3,
5000
)
print("[AI] YuNet Face Detector loaded.")
except Exception as e:
print(f"[AI Warning] YuNet failed to load: {e}")
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.profile_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
# 2. ONNX Runtime Sessions
try:
providers = ['CPUExecutionProvider']
if 'CUDAExecutionProvider' in ort.get_available_providers():
providers.insert(0, 'CUDAExecutionProvider')
self.depth_session = ort.InferenceSession("models/midas_small.onnx", providers=providers)
self.reid_session = ort.InferenceSession("models/reid_mobilenet.onnx", providers=providers)
print(f"[AI] Depth & ReID (ONNX) initialized.")
except Exception as e:
print(f"[AI Error] ONNX initialization failed: {e}")
self.depth_session = None
self.reid_session = None
# 3. Person Detection (SSD Lite Torch)
try:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from torchvision.models.detection import ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights
weights = SSDLite320_MobileNet_V3_Large_Weights.DEFAULT
self.person_model = ssdlite320_mobilenet_v3_large(weights=weights, box_score_thresh=Config.PERSON_CONF_THRESHOLD).to(self.device)
self.person_model.eval()
self.person_preprocess = weights.transforms()
except Exception as e:
print(f"[AI Error] Person detector failed: {e}")
self.person_model = None
self.depth_map_vis = None
self.zones = {"LEFT": False, "CENTER": False, "RIGHT": False}
self.zone_scores = {"LEFT": 0.0, "CENTER": 0.0, "RIGHT": 0.0}
def _detect_haar(self, img_gray: np.ndarray) -> List[Tuple]:
faces = list(self.face_cascade.detectMultiScale(img_gray, 1.1, 7, minSize=(30, 30)))
profiles = self.profile_cascade.detectMultiScale(img_gray, 1.1, 8, minSize=(35, 35))
for p in profiles:
is_new = True
px, py, pw, ph = p
for (fx, fy, fw, fh) in faces:
if abs(px - fx) < fw/2 and abs(py - fy) < fh/2:
is_new = False; break
if is_new: faces.append(tuple(p))
return faces
def detect_faces(self, frame: np.ndarray) -> List[Tuple]:
h, w = frame.shape[:2]
faces = []
# YuNet Detection
if self.face_detector is not None:
try:
self.face_detector.setInputSize((w, h))
_, detections = self.face_detector.detect(frame)
if detections is not None:
for det in detections:
faces.append(tuple(det[0:4].astype(int)))
except: pass
# Haar Fallback
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
haar_faces = self._detect_haar(gray)
for hf in haar_faces:
is_new = True
hx, hy, hw, hh = hf
for (fx, fy, fw, fh) in faces:
if abs(hx - fx) < fw/2 and abs(hy - fy) < fh/2:
is_new = False; break
if is_new: faces.append(hf)
# Center Zoom for extreme distance
zh, zw = int(h * 0.4), int(w * 0.4)
cy, cx = h // 2, w // 2
y1, y2 = cy - zh // 2, cy + zh // 2
x1, x2 = cx - zw // 2, cx + zw // 2
center_crop = frame[y1:y2, x1:x2]
center_upscaled = cv2.resize(center_crop, (zw * 2, zh * 2), interpolation=cv2.INTER_LANCZOS4)
# In Zoom mode, use YuNet if available, otherwise Haar
if self.face_detector is not None:
try:
uh, uw = center_upscaled.shape[:2]
self.face_detector.setInputSize((uw, uh))
_, zoom_detections = self.face_detector.detect(center_upscaled)
if zoom_detections is not None:
for det in zoom_detections:
zx, zy, zw_f, zh_f = det[0:4]
rx, ry = int(zx / 2) + x1, int(zy / 2) + y1
rw, rh = int(zw_f / 2), int(zh_f / 2)
is_new = True
for (fx, fy, fw, fh) in faces:
if abs(rx - fx) < fw/2 and abs(ry - fy) < fh/2:
is_new = False; break
if is_new: faces.append((rx, ry, rw, rh))
except: pass
# Always run Haar on zoom for robustness
zoom_gray = cv2.cvtColor(center_upscaled, cv2.COLOR_BGR2GRAY)
zoom_haar = self._detect_haar(zoom_gray)
for (zx, zy, zw_f, zh_f) in zoom_haar:
rx, ry = int(zx / 2) + x1, int(zy / 2) + y1
rw, rh = int(zw_f / 2), int(zh_f / 2)
is_new = True
for (fx, fy, fw, fh) in faces:
if abs(rx - fx) < fw/2 and abs(ry - fy) < fh/2:
is_new = False; break
if is_new: faces.append((rx, ry, rw, rh))
return faces
def detect_persons(self, frame: np.ndarray) -> List[Tuple]:
if self.person_model is None: return []
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(img_rgb)
input_tensor = self.person_preprocess(pil_img).to(self.device).unsqueeze(0)
with torch.no_grad():
output = self.person_model(input_tensor)[0]
persons = []
for i in range(len(output['labels'])):
if output['labels'][i] == 1 and output['scores'][i] > Config.PERSON_CONF_THRESHOLD:
box = output['boxes'][i].cpu().numpy().astype(int)
persons.append((box[0], box[1], box[2]-box[0], box[3]-box[1]))
return persons
def extract_person_features(self, frame: np.ndarray, box: Tuple) -> np.ndarray:
if self.reid_session is None: return None
try:
x, y, w, h = box
x1, y1 = max(0, x), max(0, y)
x2, y2 = min(frame.shape[1], x+w), min(frame.shape[0], y+h)
if x2 <= x1 or y2 <= y1: return None
crop = frame[y1:y2, x1:x2]
img_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (224, 224))
img_float = img_resized.astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_norm = (img_float - mean) / std
img_input = np.transpose(img_norm, (2, 0, 1)).astype(np.float32)
img_input = np.expand_dims(img_input, axis=0)
ort_inputs = {self.reid_session.get_inputs()[0].name: img_input}
features = self.reid_session.run(None, ort_inputs)[0]
return features.flatten()
except: return None
def compare_features(self, feat1: np.ndarray, feat2: np.ndarray) -> float:
if feat1 is None or feat2 is None: return 0.0
try:
norm1 = np.linalg.norm(feat1)
norm2 = np.linalg.norm(feat2)
if norm1 == 0 or norm2 == 0: return 0.0
return np.dot(feat1, feat2) / (norm1 * norm2)
except: return 0.0
def estimate_depth_and_radar(self, frame: np.ndarray):
if self.depth_session is None: return
try:
h, w = frame.shape[:2]
slice_y1, slice_y2 = int(h*0.25), int(h*0.75)
analysis_area = frame[slice_y1:slice_y2, :]
img_rgb = cv2.cvtColor(analysis_area, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (256, 256))
img_float = img_resized.astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_norm = (img_float - mean) / std
img_input = np.transpose(img_norm, (2, 0, 1)).astype(np.float32)
img_input = np.expand_dims(img_input, axis=0)
ort_inputs = {self.depth_session.get_inputs()[0].name: img_input}
pred = self.depth_session.run(None, ort_inputs)[0][0]
pred_upscaled = cv2.resize(pred, (analysis_area.shape[1], analysis_area.shape[0]), interpolation=cv2.INTER_CUBIC)
out_norm = cv2.normalize(pred_upscaled, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
self.depth_map_vis = cv2.applyColorMap(out_norm, cv2.COLORMAP_MAGMA)
depth_map = pred_upscaled / (np.max(pred_upscaled) + 1e-5)
dh, dw = depth_map.shape
zone_w = dw // 3
for i, name in enumerate(["LEFT", "CENTER", "RIGHT"]):
zx1, zx2 = i * zone_w, (i + 1) * zone_w
score = np.mean(depth_map[dh//4:3*dh//4, zx1:zx2])
self.zone_scores[name] = score
self.zones[name] = score > Config.DEPTH_THRESHOLD
except: pass

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,8 +1,8 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
# Create a Tello instance
tello = TelloSimClient()

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,5 +1,5 @@
from typing import Callable
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
print(TelloSimClient)

View File

@@ -1,7 +1,7 @@
import time
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
# Create a Tello instance

View File

@@ -1,7 +1,7 @@
import time
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
# Create a Tello instance
tello = TelloSimClient()

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import cv2
# Create a Tello instance

View File

@@ -2,7 +2,7 @@ import os
import cv2
import time
import numpy as np
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
# Initialize and connect
tello = TelloSimClient()
@@ -20,12 +20,16 @@ tello.streamoff()
# Prepare directory to save
script_dir = os.path.dirname(__file__)
artifact_folder_path = os.path.join(script_dir, "../../artifacts/images")
artifact_folder_path = os.path.join(os.path.dirname(script_dir), "artifacts", "images")
os.makedirs(artifact_folder_path, exist_ok=True)
print("[Example] Saving captured picture to:", artifact_folder_path)
# Save the frame
save_path = os.path.join(artifact_folder_path, "picture.png")
cv2.imwrite(save_path, np.array(frame_read.frame))
# Land
tello.land()

View File

@@ -2,7 +2,7 @@ import os
from threading import Thread
import time
from typing import cast
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import cv2

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,4 +1,4 @@
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,6 +1,6 @@
# Not verified working on normal Tello!
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

View File

@@ -1,5 +1,5 @@
"Here try to create a script that navigates the drone through a course."
from tello_sim.tello_sim_client import TelloSimClient
from tello_sim_client import TelloSimClient
import time

File diff suppressed because one or more lines are too long

BIN
models/midas_small.onnx Normal file

Binary file not shown.

Binary file not shown.

BIN
models/mobilenet_v3.onnx Normal file

Binary file not shown.

Binary file not shown.

BIN
models/reid_mobilenet.onnx Normal file

Binary file not shown.

Binary file not shown.

0
models/yolov8n.onnx Normal file
View File

View File

@@ -1,4 +1,10 @@
ursina
PyOpenGL
numpy
opencv-python
ursina==7.0.0
PyOpenGL==3.1.9
numpy>=2.0.0
opencv-python>=4.10.0.0
opencv-contrib-python>=4.10.0.0
djitellopy>=2.5.0
onnxruntime>=1.19.0
torch>=2.0.0
torchvision>=0.15.0
Pillow>=10.0.0

6
run.py Normal file
View File

@@ -0,0 +1,6 @@
from drone_pilot.main import FaceTrackingApp
if __name__ == "__main__":
# Change use_real_tello=True when flying the physical drone
app = FaceTrackingApp(use_real_tello=True)
app.run()

View File

@@ -1,7 +1,7 @@
import os
import socket
from ursina import *
from cv2.typing import MatLike
import errno
from ursina import * # type: ignore
from time import time
import cv2
from ursina_adapter import UrsinaAdapter
@@ -17,10 +17,34 @@ class CommandServer:
self.stream_active = False
self.last_altitude = 0
self._recording_folder = "output/recordings"
self.server_socket = None
if not os.path.exists(self._recording_folder):
os.makedirs(self._recording_folder)
def check_port_available(self, port: int = 9999) -> bool:
"""
Check if the specified port is available.
Returns True if available, False if in use.
"""
try:
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind(('127.0.0.1', port))
test_socket.close()
return True
except OSError:
return False
def cleanup(self):
"""Clean up resources and close the server socket."""
if self.server_socket:
try:
self.server_socket.close()
print("[Command Listener] Server socket closed.")
except Exception as e:
print(f"[Command Listener] Error closing socket: {e}")
def streamon(self):
"""Start capturing screenshots and enable FPV video preview."""
if not self.stream_active:
@@ -60,16 +84,35 @@ class CommandServer:
"""
Listens for commands to send to the Simulator
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 9999)) # Port number for communication
server.listen(5)
print("[Command Listener] Listening on port 9999...")
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.server_socket.bind(('127.0.0.1', 9999)) # Port number for communication
self.server_socket.listen(5)
print("[Command Listener] Listening on port 9999...")
except OSError as e:
if e.errno == errno.EADDRINUSE: # Address already in use
print("\n" + "="*70)
print("ERROR: Port 9999 is already in use!")
print("="*70)
print("\nAnother instance of the simulator may be running.")
print("\nTo fix this, run one of these commands in your terminal:")
print(" macOS/Linux: lsof -ti:9999 | xargs kill -9")
print(" Windows: netstat -ano | findstr :9999")
print(" taskkill /PID <PID> /F")
print("\nOr simply restart your computer.")
print("="*70 + "\n")
raise
else:
raise
while True:
conn, _ = server.accept()
data = conn.recv(1024).decode()
if data:
print(f"[Command Listener] Received command: {data}")
try:
while True:
conn, _ = self.server_socket.accept()
data = conn.recv(1024).decode()
if data:
print(f"[Command Listener] Received command: {data}")
if data == "connect":
self._ursina_adapter.connect()
@@ -202,13 +245,25 @@ class CommandServer:
conn.send(state.encode())
elif data == "get_latest_frame":
# Save the frame to disk first
frame_path = os.path.join(self._recording_folder, "latest_frame.png")
# Send frame data directly over TCP instead of using filesystem
if self._ursina_adapter.latest_frame is not None:
cv2.imwrite(frame_path, self._ursina_adapter.latest_frame)
conn.send(frame_path.encode())
# Encode frame as PNG in memory
success, buffer = cv2.imencode('.png', self._ursina_adapter.latest_frame)
if success:
# Send frame size first (4 bytes)
frame_data = buffer.tobytes()
frame_size = len(frame_data)
conn.send(frame_size.to_bytes(4, byteorder='big'))
# Then send the actual frame data
conn.send(frame_data)
print(f"[Frame Transfer] Sent {frame_size} bytes over TCP")
else:
# Send 0 size to indicate no frame
conn.send((0).to_bytes(4, byteorder='big'))
else:
conn.send(b"N/A")
# Send 0 size to indicate no frame available
conn.send((0).to_bytes(4, byteorder='big'))
elif data == "capture_frame":
self._ursina_adapter.capture_frame()
elif data.startswith("set_speed"):
@@ -221,4 +276,11 @@ class CommandServer:
elif data == "end":
self.end()
conn.close()
conn.close()
except KeyboardInterrupt:
print("\n[Command Listener] Shutting down...")
self.cleanup()
except Exception as e:
print(f"[Command Listener] Error: {e}")
self.cleanup()
raise

View File

@@ -1,21 +1,69 @@
from command_server import CommandServer
from ursina_adapter import UrsinaAdapter
import threading
import atexit
import signal
import sys
class TelloDroneSim:
def __init__(self):
self._ursina_adapter = UrsinaAdapter()
self._server = CommandServer(self._ursina_adapter)
# Register cleanup handlers
atexit.register(self.cleanup)
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle termination signals gracefully."""
print("\n[Tello Sim] Received shutdown signal, cleaning up...")
self.cleanup()
sys.exit(0)
def cleanup(self):
"""Clean up resources."""
if hasattr(self, '_server'):
self._server.cleanup()
@property
def state(self):
return self._ursina_adapter
def start(self):
# Check if port is available before starting
if not self._server.check_port_available(9999):
print("\n" + "="*70)
print("ERROR: Cannot start simulator - Port 9999 is already in use!")
print("="*70)
print("\nAnother instance of the simulator may be running.")
print("\nTo fix this, run one of these commands in your terminal:")
print(" macOS/Linux: lsof -ti:9999 | xargs kill -9")
print(" Windows: netstat -ano | findstr :9999")
print(" taskkill /PID <PID> /F")
print("\nOr simply restart your computer.")
print("="*70 + "\n")
sys.exit(1)
server_thread = threading.Thread(target=self._server.listen)
server_thread.daemon = True
server_thread.start()
self._ursina_adapter.run()
try:
self._ursina_adapter.run()
except KeyboardInterrupt:
print("\n[Tello Sim] Interrupted, cleaning up...")
self.cleanup()
except Exception as e:
print(f"[Tello Sim] Error: {e}")
self.cleanup()
raise
def update(self) -> None:
self._ursina_adapter.tick()
self._ursina_adapter.tick()
if __name__ == "__main__":
sim = TelloDroneSim()
def update():
sim.update()
sim.start()

View File

@@ -1,222 +0,0 @@
from dataclasses import dataclass
import logging
import subprocess
import platform
import sys
import time
import socket
import cv2
import os
import numpy as np
@dataclass
class BackgroundFrameRead():
frame: cv2.typing.MatLike
class TelloSimClient:
def __init__(self, host='localhost', port=9999, auto_start_simulation=True):
self.host = host
self.port = port
self.latest_frame = None
if auto_start_simulation and not self._check_simulation_running():
self._start_simulation()
print("[Wrapper] Starting Tello Simulation...")
self._wait_for_simulation()
def _start_simulation(self):
sim_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tello_drone_sim.py'))
if platform.system() == "Windows":
command = f'start cmd /k python "{sim_path}"'
print("[DEBUG] Launching simulation command:", command)
subprocess.Popen(command, shell=True)
elif platform.system() == "Linux":
subprocess.Popen(['gnome-terminal', '--', 'python3', 'tello_drone_sim.py'])
elif platform.system() == "Darwin":
subprocess.Popen(['ls'])
subprocess.Popen(['pwd'])
python_path = os.path.join(os.path.dirname(sys.executable), 'python3')
print("Running python3 from path:", python_path)
subprocess.Popen([python_path, './tello_drone_sim.py'], cwd=os.getcwd(),
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
start_new_session=True)
else:
raise OSError("Unsupported OS for launching terminal simulation.")
def _check_simulation_running(self):
try:
with socket.create_connection((self.host, self.port), timeout=1):
return True
except (ConnectionRefusedError, socket.timeout, OSError) as ex:
logging.error("[Wrapper] Simulation is not running.", ex)
return False
def _wait_for_simulation(self, timeout=30):
print("[Wrapper] Waiting for simulation to become ready...")
start_time = time.time()
while time.time() - start_time < timeout:
if self._check_simulation_running():
print("[Wrapper] Simulation is now ready!")
return
time.sleep(1)
raise TimeoutError("[Error] Simulation did not become ready in time.")
def _send_command(self, command: str):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(command.encode())
except ConnectionRefusedError:
print(f"[Error] Unable to connect to the simulation at {self.host}:{self.port}")
def get_frame_read(self) -> BackgroundFrameRead:
frame_path = self._request_data('get_latest_frame')
if frame_path != "N/A" and os.path.exists(frame_path):
image = cv2.imread(frame_path)
if image is not None:
return BackgroundFrameRead(frame=cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
def _request_data(self, command):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(command.encode())
return s.recv(4096).decode()
except ConnectionRefusedError:
print(f"[Error] Unable to retrieve '{command}' from {self.host}:{self.port}")
return "N/A"
def wait_until_motion_complete(self):
while self._request_data("get_is_moving") == "True":
time.sleep(0.1)
def get_battery(self):
return self._request_data('get_battery')
def get_distance_tof(self):
return self._request_data('get_distance_tof')
def get_height(self):
return self._request_data('get_height')
def get_flight_time(self):
return self._request_data('get_flight_time')
def get_speed_x(self):
return self._request_data('get_speed_x')
def get_speed_y(self):
return self._request_data('get_speed_y')
def get_speed_z(self):
return self._request_data('get_speed_z')
def get_acceleration_x(self):
return self._request_data('get_acceleration_x')
def get_acceleration_y(self):
return self._request_data('get_acceleration_y')
def get_acceleration_z(self):
return self._request_data('get_acceleration_z')
def get_pitch(self):
return self._request_data('get_pitch')
def get_roll(self):
return self._request_data('get_roll')
def get_yaw(self):
return self._request_data('get_yaw')
def query_attitude(self):
return self._request_data('query_attitude')
def get_current_state(self):
return self._request_data('get_current_state')
def connect(self):
self._send_command('connect')
def takeoff(self):
self._send_command('takeoff')
def land(self):
self._send_command('land')
def rotate_clockwise(self, degrees):
self._send_command(f'rotate_cw {degrees}')
def rotate_counter_clockwise(self, degrees):
self._send_command(f'rotate_ccw {degrees}')
def streamon(self):
self._send_command('streamon')
def streamoff(self):
self._send_command('streamoff')
def capture_frame(self):
self._send_command('capture_frame')
def emergency(self):
self._send_command('emergency')
def move_forward(self, distance):
self._send_command(f'forward {distance}')
def move_back(self, distance):
self._send_command(f'backward {distance}')
def move_left(self, distance):
self._send_command(f'left {distance}')
def move_right(self, distance):
self._send_command(f'right {distance}')
def move_up(self, distance):
self._send_command(f'up {distance}')
def move_down(self, distance):
self._send_command(f'down {distance}')
def flip_left(self):
self._send_command('flip_left')
def flip_right(self):
self._send_command('flip_right')
def flip_forward(self):
self._send_command('flip_forward')
def flip_back(self):
self._send_command('flip_back')
def go_xyz_speed(self, x, y, z, speed):
self._send_command(f"go {x} {y} {z} {speed}")
def curve_xyz_speed(self, x1, y1, z1, x2, y2, z2, speed):
self._send_command(f"curve {x1} {y1} {z1} {x2} {y2} {z2} {speed}")
def set_speed(self, speed):
self._send_command(f"set_speed {speed}")
def send_rc_control(self, left_right_velocity, forward_backward_velocity, up_down_velocity, yaw_velocity):
self._send_command(f"send_rc_control {left_right_velocity} {forward_backward_velocity} {up_down_velocity} {yaw_velocity}")
def end(self):
self._send_command('end')
def get_info(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(b'get_info')
return s.recv(4096).decode()
except ConnectionRefusedError:
print(f"[Error] Unable to retrieve info from {self.host}:{self.port}")
return "{}"
def initiate_throw_takeoff(self):
self._send_command('throw_takeoff')

View File

@@ -4,7 +4,6 @@ from OpenGL.GL import glReadPixels, GL_RGBA, GL_UNSIGNED_BYTE
import numpy as np
from typing import Literal
import cv2
import numpy as np
from ursina import (
Ursina,
window,
@@ -163,7 +162,6 @@ class UrsinaAdapter():
scale=(15, 15, 12),
position=(-123, 0.0, 260),
rotation=(0, 0, 0),
collider='none',
cast_shadow=True
)
@@ -176,12 +174,20 @@ class UrsinaAdapter():
cast_shadow=True
)
self.walking_person = Entity(
model='entities/business_man.glb',
scale=7.3,
position=(0, 13.0, 40),
rotation=(0, 90, 0),
collider='box',
cast_shadow=True
)
self.light1 = Entity(
model='entities/street_light.glb',
scale=(4, 6.5, 5),
position=(-55, 2.5, 260),
rotation=(0, -90, 0),
collider='none',
cast_shadow=True
)
@@ -191,7 +197,6 @@ class UrsinaAdapter():
scale=(4, 6.5, 5),
position=(25, 2.5, 95),
rotation=(0, 90, 0),
collider='none',
cast_shadow=True
)
@@ -200,7 +205,6 @@ class UrsinaAdapter():
scale=(4, 6.5, 5),
position=(-55, 2.5, -70),
rotation=(0, -90, 0),
collider='none',
cast_shadow=True
)
@@ -221,8 +225,7 @@ class UrsinaAdapter():
model='entities/tunnel_3.glb',
scale=(63, 45, 45),
position=(-199, 3.0, 380),
rotation=(0, 0, 0),
collider='None',
rotation=(0, 0, 0),
cast_shadow=True
)
@@ -308,6 +311,12 @@ class UrsinaAdapter():
self.max_roll = 20
self.tilt_smoothness = 0.05
# RC control state
self.rc_lr = 0.0
self.rc_fb = 0.0
self.rc_ud = 0.0
self.rc_yaw = 0.0
self.create_meters()
def run(self):
@@ -483,6 +492,7 @@ class UrsinaAdapter():
return int(self.velocity.x * 3.6)
def get_speed_y(self) -> int:
from ursina import time as ursina_time
current_time = time()
elapsed_time = current_time - self.last_time
@@ -590,35 +600,90 @@ class UrsinaAdapter():
self.emergency()
def update_movement(self) -> None:
self.velocity += self.acceleration
import ursina
dt = ursina.time.dt
t = time()
# Update walking person movement
# Moves between -15 and 15 on X axis
walk_speed = 2.0
old_x = self.walking_person.x
self.walking_person.x = sin(t * 0.5) * 15
if self.velocity is None:
raise Exception("Velocity is None")
# Rotate person based on direction
if self.walking_person.x > old_x:
self.walking_person.rotation_y = 90
else:
self.walking_person.rotation_y = -90
if self.velocity.length() > self.max_speed:
self.velocity = self.velocity.normalized() * self.max_speed
# Apply RC control in world space based on local drone orientation
if self.is_flying:
# Tello RC values are -100 to 100.
speed_mult_h = 0.18 # Increased: 100 -> 18.0 m/s for faster forward
speed_mult_v = 0.05 # Vertical: 100 -> 5.0 m/s
# Robust local-to-world conversion using yaw angle
rad = np.radians(self.drone.rotation_y)
sin_y = np.sin(rad)
cos_y = np.cos(rad)
# vx = right-axis component, vz = forward-axis component
vx = (self.rc_fb * speed_mult_h * sin_y) + (self.rc_lr * speed_mult_h * cos_y)
vz = (self.rc_fb * speed_mult_h * cos_y) - (self.rc_lr * speed_mult_h * sin_y)
vy = self.rc_ud * speed_mult_v
target_velocity = Vec3(vx, vy, vz)
# Smoothly interpolate current velocity towards target
self.velocity = lerp(self.velocity, target_velocity, 10 * dt)
# Apply yaw rotation (degrees per second)
# Reduced significantly: multiplier from 3.5 to 1.2
self.drone.rotation_y += self.rc_yaw * 1.2 * dt
# Set target tilt angles for visual feedback
self.pitch_angle = (self.rc_fb / 100.0) * self.max_pitch
self.roll_angle = (self.rc_lr / 100.0) * self.max_roll
self.velocity *= self.drag
new_position = self.drone.position + self.velocity
hit_info = raycast(self.drone.position, self.velocity.normalized(), distance=self.velocity.length(), ignore=(self.drone,)) # type: ignore
# Acceleration (m/s^2) from move() commands
self.velocity += self.acceleration * dt
if self.velocity.length() > 20.0: # Max speed 20 m/s
self.velocity = self.velocity.normalized() * 20.0
if not hit_info.hit:
self.drone.position = new_position
# Apply drag when no active RC input
if abs(self.rc_fb) < 1 and abs(self.rc_lr) < 1 and abs(self.rc_ud) < 1:
self.velocity = lerp(self.velocity, Vec3(0,0,0), 3 * dt)
# Frame-independent position update
new_position = self.drone.position + self.velocity * dt
# Collision check
if self.velocity.length() > 0.01:
hit_info = raycast(self.drone.position + Vec3(0,0.1,0), self.velocity.normalized(),
distance=self.velocity.length() * dt + 0.1, ignore=(self.drone,)) # type: ignore
if not hit_info.hit:
self.drone.position = new_position
else:
self.drone.position = new_position
if self.drone.y < 3:
self.drone.y = 3
if self.drone.y < 2.6: # Ground level
self.drone.y = 2.6
if self.velocity.y < 0: self.velocity.y = 0
self.acceleration = Vec3(0, 0, 0)
# Apply pitch and roll to the drone
self.drone.rotation_x = lerp(self.drone.rotation_x, self.pitch_angle, self.tilt_smoothness)
self.drone.rotation_z = lerp(self.drone.rotation_z, self.roll_angle, self.tilt_smoothness)
current_time = time()
dt = current_time - self.last_time_accel
from time import time as wall_time
current_time = wall_time()
dt_accel = current_time - self.last_time_accel
if dt > 0:
if dt_accel > 0:
velocity_change = self.velocity - self.last_velocity_accel
self.calculated_acceleration = velocity_change / dt # type: ignore
self.calculated_acceleration = velocity_change / dt_accel # type: ignore
self.last_velocity_accel = Vec3(self.velocity.x, self.velocity.y, self.velocity.z)
self.last_time_accel = current_time
@@ -702,15 +767,11 @@ class UrsinaAdapter():
self.drone.rotation_z = lerp(self.drone.rotation_z, self.roll_angle, self.tilt_smoothness)
def send_rc_control(self, left_right_velocity_ms: float, forward_backward_velocity_ms: float, up_down_velocity_ms: float, yaw_velocity_ms: float):
self.velocity = Vec3(
-left_right_velocity_ms / 100, # LEFT/RIGHT mapped to X
up_down_velocity_ms / 100, # UP/DOWN mapped to Y
forward_backward_velocity_ms / 100 # FORWARD/BACKWARD mapped to Z
)
self.drone.rotation_y += -yaw_velocity_ms * 0.05 # Smooth yaw rotation
print(f"[RC Control] Velocities set -> LR: {left_right_velocity_ms}, FB: {forward_backward_velocity_ms}, UD: {up_down_velocity_ms}, Yaw: {yaw_velocity_ms}")
# Only store the target RC values to be applied in the main thread (update_movement)
self.rc_lr = left_right_velocity_ms
self.rc_fb = forward_backward_velocity_ms
self.rc_ud = up_down_velocity_ms
self.rc_yaw = yaw_velocity_ms
@staticmethod
def map_coords(x: float, y: float, z: float) -> Vec3:
@@ -791,6 +852,7 @@ class UrsinaAdapter():
def _motion_complete_callback(self):
self.is_moving = False
self._execute_next_command()
def land(self) -> None:
if self.is_moving:
print("Tello Simulator: Movement in progress. Deferring landing...")
@@ -828,7 +890,7 @@ class UrsinaAdapter():
def capture_frame(self):
"""Capture and save the latest FPV frame from update()"""
"""Capture the latest FPV frame. Optionally save to disk if save_frames_to_disk is True."""
if not self.stream_active:
print("[Capture] Stream not active. Cannot capture frame.")
return
@@ -837,11 +899,9 @@ class UrsinaAdapter():
print("[Capture] No latest frame available.")
return
frame_path = os.path.join(self.recording_folder, f"frame_{self.frame_count}.png")
cv2.imwrite(frame_path, self.latest_frame)
self.saved_frames.append(frame_path)
# Always increment frame count for tracking
self.frame_count += 1
print(f"[Capture] Screenshot {self.frame_count} saved: {frame_path}")
print(f"[Capture] Frame {self.frame_count} captured (memory only)")
def set_speed(self, x: int):
"""Set drone speed by adjusting acceleration force.
@@ -898,8 +958,9 @@ class UrsinaAdapter():
rolling = False
if self.bezier_mode:
t_now = time()
elapsed = t_now - self.bezier_start_time
from time import time as wall_time
t_now = wall_time()
elapsed = t_now - self.bezier_start_time # type: ignore
t = min(1.0, elapsed / self.bezier_duration)
# Bézier point
@@ -925,8 +986,6 @@ class UrsinaAdapter():
if self.stream_active:
self.capture_frame()
if not moving:
self.pitch_angle = 0 # Reset pitch when not moving
@@ -937,4 +996,5 @@ class UrsinaAdapter():
self.update_movement()
self.update_pitch_roll()
def update(self):
self.tick()

148
tello_sim_client.py Normal file
View File

@@ -0,0 +1,148 @@
from dataclasses import dataclass
import subprocess
import platform
import sys
import time
import socket
import cv2
import os
import numpy as np
try:
from djitellopy import Tello
HAS_TELLO_LIB = True
except ImportError:
HAS_TELLO_LIB = False
@dataclass
class BackgroundFrameRead():
frame: cv2.typing.MatLike
class TelloSimClient:
def __init__(self, host='127.0.0.1', port=9999, auto_start_simulation=True, use_real_tello=False):
self.host = host
self.port = port
self.use_real_tello = use_real_tello
self.real_drone = None
if self.use_real_tello:
if HAS_TELLO_LIB:
print("[Client] Connecting to REAL Tello Hardware...")
self.real_drone = Tello()
else:
print("[Error] djitellopy not found! Falling back to Simulator.")
self.use_real_tello = False
if not self.use_real_tello:
if auto_start_simulation and not self._check_simulation_running():
self._start_simulation()
self._wait_for_simulation()
def connect(self):
if self.real_drone: self.real_drone.connect()
else: self._send_command('connect')
def takeoff(self):
if self.real_drone: self.real_drone.takeoff()
else: self._send_command('takeoff')
def land(self):
if self.real_drone: self.real_drone.land()
else: self._send_command('land')
def streamon(self):
if self.real_drone: self.real_drone.streamon()
else: self._send_command('streamon')
def streamoff(self):
if self.real_drone: self.real_drone.streamoff()
else: self._send_command('streamoff')
def get_frame_read(self):
if self.real_drone: return self.real_drone.get_frame_read()
return self # Simulator acts as frame provider via TCP in get_frame_read-style
@property
def frame(self):
"""Helper for simulator to match BackgroundFrameRead structure"""
return self._receive_tcp_frame()
def send_rc_control(self, lr, fb, ud, yaw):
if self.real_drone: self.real_drone.send_rc_control(int(lr), int(fb), int(ud), int(yaw))
else: self._send_command(f"send_rc_control {lr} {fb} {ud} {yaw}")
def get_yaw(self):
if self.real_drone: return self.real_drone.get_yaw()
return float(self._request_data('get_yaw') or 0)
def get_height(self):
if self.real_drone: return self.real_drone.get_height()
return float(self._request_data('get_height') or 0)
def get_distance_tof(self):
if self.real_drone: return self.real_drone.get_distance_tof()
return float(self._request_data('get_distance_tof') or 0)
def get_battery(self):
if self.real_drone: return self.real_drone.get_battery()
return int(self._request_data('get_battery') or 0)
def end(self):
if self.real_drone:
self.real_drone.streamoff()
else:
self._send_command('end')
# --- Internal Simulator Logic ---
def _start_simulation(self):
sim_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tello_sim', 'tello_drone_sim.py'))
print(f"[Wrapper] Launching Simulator: {sim_path}")
clean_env = os.environ.copy()
# Set Qt platform to xcb for better compatibility on Linux/Wayland
clean_env["QT_QPA_PLATFORM"] = "xcb"
# Clean PyCharm debugger vars
for k in list(clean_env.keys()):
if k.startswith('PYDEVD'): del clean_env[k]
subprocess.Popen([sys.executable, sim_path], cwd=os.path.dirname(sim_path),
env=clean_env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
def _check_simulation_running(self):
try:
with socket.create_connection((self.host, self.port), timeout=0.5): return True
except: return False
def _wait_for_simulation(self, timeout=15):
start = time.time()
while time.time() - start < timeout:
if self._check_simulation_running(): return
time.sleep(1)
print("[Warning] Simulator not responding.")
def _send_command(self, cmd):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port)); s.send(cmd.encode())
except: pass
def _request_data(self, cmd):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port)); s.send(cmd.encode())
return s.recv(1024).decode()
except: return None
def _receive_tcp_frame(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(b'get_latest_frame')
size_data = s.recv(4)
if not size_data: return None
size = int.from_bytes(size_data, 'big')
data = b''
while len(data) < size:
chunk = s.recv(min(size - len(data), 4096))
if not chunk: break
data += chunk
return cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
except: return None