1. Python GUI 프로그램
기존 2.x 버전의 SDK 예제 GUI 프로그램을 3.x로 변경한 것이다. 이 프로그램은 라이브러리 문제와 문법 차이가 있어 바로 3.x 파이썬에서 실행되지 않는다. 3.x 버전의 GUI 예제 프로그램을 검색으로 찾지를 못하여 기존 2.x 버전의 GUI 프로그램을 3.x로 수정 변경하였다. [참고]https://github.com/damiafuentes/DJITelloPy

2. main.py
import tello
from tello_control_ui import TelloUI
def main():
drone = tello.Tello('0.0.0.0', 8889)
vplayer = TelloUI(drone,"./img/")
# start the Tkinter mainloop
vplayer.root.mainloop()
if __name__ == "__main__":
main()
3. tello_control_ui.py
#!/usr/bin/python3
import os
from PIL import Image
from PIL import ImageTk
#import Tkinter as tki ==> import tkinter as tki
import tkinter as tki
#from Tkinter import Toplevel, Scale ==>from tkinter import Toplevel, Scale
from tkinter import Toplevel, Scale
import threading
import datetime
import cv2 # pip3 install opencv-python --upgrade --force-reinstall
import time
import platform
class TelloUI:
def __init__(self,tello,outputpath):
"""
Initial all the element of the GUI,support by Tkinter
:param tello: class interacts with the Tello drone.
Raises:
RuntimeError: If the Tello rejects the attempt to enter command mode.
"""
self.tello = tello # videostream device
self.outputPath = outputpath # the path that save pictures created by clicking the takeSnapshot button
#self.frame = None # frame read from h264decoder and used for pose recognition
self.frame = cv2.imread('tello.png')
self.thread = None # thread of the Tkinter mainloop
self.stopEvent = None
# control variables
self.distance = 0.1 # default distance for 'move' cmd
self.degree = 30 # default degree for 'cw' or 'ccw' cmd
# if the flag is TRUE,the auto-takeoff thread will stop waiting for the response from tello
self.quit_waiting_flag = False
# initialize the root window and image panel
self.root = tki.Tk()
self.panel = None
# create buttons
self.btn_snapshot = tki.Button(self.root, text="Snapshot!", command=self.takeSnapshot)
self.btn_snapshot.pack(side="bottom", fill="both", expand="yes", padx=10, pady=5)
self.btn_pause = tki.Button(self.root, text="Pause", relief="raised", command=self.pauseVideo)
self.btn_pause.pack(side="bottom", fill="both", expand="yes", padx=10, pady=5)
self.btn_landing = tki.Button(self.root, text="Open Command Panel", relief="raised", command=self.openCmdWindow)
self.btn_landing.pack(side="bottom", fill="both", expand="yes", padx=10, pady=5)
# start a thread that constantly pools the video sensor for
# the most recently read frame
self.stopEvent = threading.Event()
self.thread = threading.Thread(target=self.videoLoop, args=())
self.thread.start()
# set a callback to handle when the window is closed
self.root.wm_title("TELLO Controller")
self.root.wm_protocol("WM_DELETE_WINDOW", self.onClose)
# the sending_command will send command to tello every 5 seconds
self.sending_command_thread = threading.Thread(target = self._sendingCommand)
def videoLoop(self):
"""
The mainloop thread of Tkinter
Raises:
RuntimeError: To get around a RunTime error that Tkinter throws due to threading.
"""
try:
# start the thread that get GUI image and drwa skeleton
time.sleep(0.5)
self.sending_command_thread.start()
system = platform.system()
while not self.stopEvent.is_set():
# read the frame for GUI show
self.frame = self.tello.read()
# we found compatibility problem between Tkinter,PIL and Macos,and it will
# sometimes result the very long preriod of the "ImageTk.PhotoImage" function,
# so for Macos,we start a new thread to execute the _updateGUIImage function.
if self.frame is None or self.frame.size == 0:
#print("videoLoop self.frame is None")
# transfer the format from frame to image
continue
else:
image = Image.fromarray(self.frame)
if system =="Windows" or system =="Linux":
self._updateGUIImage(image)
else:
thread_tmp = threading.Thread(target=self._updateGUIImage,args=(image,))
thread_tmp.start()
time.sleep(0.03)
except Exception as e:
print("[INFO] caught a RuntimeError"+str(e))
def _updateGUIImage(self,image):
"""
Main operation to initial the object of image,and update the GUI panel
"""
if image is None:
return
image = ImageTk.PhotoImage(image)
# if the panel none ,we need to initial it
if self.panel is None:
self.panel = tki.Label(image=image)
self.panel.image = image
self.panel.pack(side="left", padx=10, pady=10)
# otherwise, simply update the panel
else:
self.panel.configure(image=image)
self.panel.image = image
def _sendingCommand(self):
"""
start a while loop that sends 'command' to tello every 5 second
"""
loopTurn=True
if loopTurn:
return
while True:
if loopTurn:
self.tello.send_command('command')
else:
self.tello.send_command('command')
loopTurn= not loopTurn
time.sleep(10)
pass
def _setQuitWaitingFlag(self):
"""
set the variable as TRUE,it will stop computer waiting for response from tello
"""
self.quit_waiting_flag = True
def openCmdWindow(self):
"""
open the cmd window and initial all the button and text
"""
panel = Toplevel(self.root)
panel.wm_title("Command Panel")
# create text input entry
text0 = tki.Label(panel,
text='This Controller map keyboard inputs to Tello control commands\n'
'Adjust the trackbar to reset distance and degree parameter',
font='Helvetica 10 bold'
)
text0.pack(side='top')
text1 = tki.Label(panel, text=
'W - Move Tello Up\t\t\tArrow Up - Move Tello Forward\n'
'S - Move Tello Down\t\t\tArrow Down - Move Tello Backward\n'
'A - Rotate Tello Counter-Clockwise\tArrow Left - Move Tello Left\n'
'D - Rotate Tello Clockwise\t\tArrow Right - Move Tello Right',
justify="left")
text1.pack(side="top")
self.btn_landing = tki.Button(
panel, text="Land", relief="raised", command=self.telloLanding)
self.btn_landing.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
self.btn_takeoff = tki.Button(
panel, text="Takeoff", relief="raised", command=self.telloTakeOff)
self.btn_takeoff.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
# binding arrow keys to drone control
self.tmp_f = tki.Frame(panel, width=100, height=2)
self.tmp_f.bind('<KeyPress-w>', self.on_keypress_w)
self.tmp_f.bind('<KeyPress-s>', self.on_keypress_s)
self.tmp_f.bind('<KeyPress-a>', self.on_keypress_a)
self.tmp_f.bind('<KeyPress-d>', self.on_keypress_d)
self.tmp_f.bind('<KeyPress-Up>', self.on_keypress_up)
self.tmp_f.bind('<KeyPress-Down>', self.on_keypress_down)
self.tmp_f.bind('<KeyPress-Left>', self.on_keypress_left)
self.tmp_f.bind('<KeyPress-Right>', self.on_keypress_right)
self.tmp_f.pack(side="bottom")
self.tmp_f.focus_set()
self.btn_landing = tki.Button(
panel, text="Flip", relief="raised", command=self.openFlipWindow)
self.btn_landing.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
self.distance_bar = Scale(panel, from_=0.02, to=5, tickinterval=0.01, digits=3, label='Distance(m)',
resolution=0.01)
self.distance_bar.set(0.2)
self.distance_bar.pack(side="left")
self.btn_distance = tki.Button(panel, text="Reset Distance", relief="raised",
command=self.updateDistancebar,
)
self.btn_distance.pack(side="left", fill="both",
expand="yes", padx=10, pady=5)
self.degree_bar = Scale(panel, from_=1, to=360, tickinterval=10, label='Degree')
self.degree_bar.set(30)
self.degree_bar.pack(side="right")
self.btn_distance = tki.Button(panel, text="Reset Degree", relief="raised", command=self.updateDegreebar)
self.btn_distance.pack(side="right", fill="both", expand="yes", padx=10, pady=5)
def openFlipWindow(self):
"""
open the flip window and initial all the button and text
"""
panel = Toplevel(self.root)
panel.wm_title("Gesture Recognition")
self.btn_flipl = tki.Button(
panel, text="Flip Left", relief="raised", command=self.telloFlip_l)
self.btn_flipl.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
self.btn_flipr = tki.Button(
panel, text="Flip Right", relief="raised", command=self.telloFlip_r)
self.btn_flipr.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
self.btn_flipf = tki.Button(
panel, text="Flip Forward", relief="raised", command=self.telloFlip_f)
self.btn_flipf.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
self.btn_flipb = tki.Button(
panel, text="Flip Backward", relief="raised", command=self.telloFlip_b)
self.btn_flipb.pack(side="bottom", fill="both",
expand="yes", padx=10, pady=5)
def takeSnapshot(self):
"""
save the current frame of the video as a jpg file and put it into outputpath
"""
print( self.tello.get_battery() )
# grab the current timestamp and use it to construct the filename
ts = datetime.datetime.now()
filename = "./img/{}.jpg".format(ts.strftime("%Y-%m-%d_%H-%M-%S"))
# save the file
self.frame = self.tello.read()
if self.frame is None or self.frame.size == 0:
print("takeSnapshot self.frame is None")
self.frame = cv2.imread('tello.png')
cv2.imwrite(filename, cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR))
print("[INFO] saved {}".format(filename))
def pauseVideo(self):
"""
Toggle the freeze/unfreze of video
"""
if self.btn_pause.config('relief')[-1] == 'sunken':
self.btn_pause.config(relief="raised")
self.tello.video_freeze(False)
else:
self.btn_pause.config(relief="sunken")
self.tello.video_freeze(True)
def telloTakeOff(self):
return self.tello.takeoff()
def telloLanding(self):
return self.tello.land()
def telloFlip_l(self):
return self.tello.flip('l')
def telloFlip_r(self):
return self.tello.flip('r')
def telloFlip_f(self):
return self.tello.flip('f')
def telloFlip_b(self):
return self.tello.flip('b')
def telloCW(self, degree):
return self.tello.rotate_cw(degree)
def telloCCW(self, degree):
return self.tello.rotate_ccw(degree)
def telloMoveForward(self, distance):
return self.tello.move_forward(distance)
def telloMoveBackward(self, distance):
return self.tello.move_backward(distance)
def telloMoveLeft(self, distance):
return self.tello.move_left(distance)
def telloMoveRight(self, distance):
return self.tello.move_right(distance)
def telloUp(self, dist):
return self.tello.move_up(dist)
def telloDown(self, dist):
return self.tello.move_down(dist)
def updateTrackBar(self):
self.my_tello_hand.setThr(self.hand_thr_bar.get())
def updateDistancebar(self):
self.distance = self.distance_bar.get()
print ('reset distance to %.1f' % self.distance)
def updateDegreebar(self):
self.degree = self.degree_bar.get()
print ('reset distance to %d' % self.degree)
def on_keypress_w(self, event):
print ("up %d m" % self.distance)
self.telloUp(self.distance)
def on_keypress_s(self, event):
print( "down %d m" % self.distance)
self.telloDown(self.distance)
def on_keypress_a(self, event):
print( "ccw %d degree" % self.degree)
self.tello.rotate_ccw(self.degree)
def on_keypress_d(self, event):
print ("cw %d m" % self.degree)
self.tello.rotate_cw(self.degree)
def on_keypress_up(self, event):
print("forward %d m" % self.distance)
self.telloMoveForward(self.distance)
def on_keypress_down(self, event):
print ("backward %d m" % self.distance)
self.telloMoveBackward(self.distance)
def on_keypress_left(self, event):
print ("left %d m" % self.distance)
self.telloMoveLeft(self.distance)
def on_keypress_right(self, event):
print ("right %d m" % self.distance)
self.telloMoveRight(self.distance)
def on_keypress_enter(self, event):
if self.frame is not None:
self.registerFace()
self.tmp_f.focus_set()
def onClose(self):
print("[INFO] closing...")
self.stopEvent.set()
del self.tello
self.root.quit()
pass #End of Class TelloUI
4. tello.py
#!/usr/bin/python3
import socket
import threading
import time
import numpy as np
import cv2
## [참고] https://github.com/damiafuentes/DJITelloPy
class Tello:
def __init__(self, local_ip="0.0.0.0", local_command_port=8889, imperial=False, command_timeout=.3,
tello_ip='192.168.10.1', tello_port=8889):
self.abort_flag = False
self.command_timeout = command_timeout
self.imperial = imperial
self.response = None
self.frame = cv2.imread('tello.png') # 기본 이미지 numpy array BGR -- current camera output frame
self.last_frame = self.frame # numpy array BGR -- current camera output frame
self.is_freeze = False # freeze current camera output
udp = socket.getprotobyname('udp')
self.socket_command = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,udp) # socket for receiving video stream
self.socket_video = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,udp) # socket for receiving video stream
self.tello_address = (tello_ip, tello_port)
self.local_video_port = 11111 # port for receiving video stream
self.last_height = 0
self.socket_command.bind((local_ip, local_command_port))
# thread for receiving cmd
self.receive_command_thread = threading.Thread(target=self._receive_command_thread)
self.receive_command_thread.daemon = True
self.receive_command_thread.start()
# thread for receiving video
self.receive_video_thread = threading.Thread(target=self._receive_video_thread)
self.receive_video_thread.daemon = True
self.receive_video_thread.start()
# to receive video -- send cmd: command, streamon
self.socket_command.sendto(b'command', self.tello_address)
print('sent: command')
time.sleep(1)
self.socket_command.sendto(b'streamon', self.tello_address)
print('sent: streamon')
def __del__(self):
self.socket_command.close()
def read(self):
if self.is_freeze:
return self.last_frame
else:
return self.frame
def video_freeze(self, is_freeze=True):
self.is_freeze = is_freeze
if is_freeze:
self.last_frame = self.frame
def _receive_command_thread(self):
while True:
try:
self.response, ip = self.socket_command.recvfrom(3000)
except socket.error as exc:
print ("Caught exception socket.error : %s" % exc)
def _receive_video_thread(self):
"""
Listens for video streaming (raw h264) from the Tello.
Runs as a thread, sets self.frame to the most recent frame Tello captured.
"""
self.video = cv2.VideoCapture('udp://@0.0.0.0:11111', cv2.CAP_FFMPEG)
time.sleep(3)
while True:
if not self.video.isOpened():
self.video.open('udp://0.0.0.0:11111')
time.sleep(0.5)
else:
break
while True:
try:
if not self.video.isOpened():
print('Try to open video stream....')
self.video.open('udp://0.0.0.0:11111')
time.sleep(0.5)
continue
ret, frame = self.video.read()
if ret:
self.frame=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#self.frame= frame ## BGR
except Exception as err:
print(err)
def send_command(self, command):
print (">> send cmd: {}".format(command))
self.abort_flag = False
timer = threading.Timer(self.command_timeout, self.set_abort_flag)
self.socket_command.sendto(command.encode('utf-8'), self.tello_address)
timer.start()
while self.response is None:
if self.abort_flag is True:
break
timer.cancel()
if self.response is None:
response = 'none_response'
else:
response = self.response.decode('utf-8')
self.response = None
return response
def set_abort_flag(self):
self.abort_flag = True
pass
def takeoff(self):
return self.send_command('takeoff')
def set_speed(self, speed):
"""
Sets speed.
This method expects KPH or MPH. The Tello API expects speeds from 1 to 100 centimeters/second.
Metric: .1 to 3.6 KPH
Imperial: .1 to 2.2 MPH
Args:
speed (int|float): Speed.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
speed = float(speed)
if self.imperial is True:
speed = int(round(speed * 44.704))
else:
speed = int(round(speed * 27.7778))
return self.send_command('speed %s' % speed)
def rotate_cw(self, degrees):
"""
Rotates clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('cw %s' % degrees)
def rotate_ccw(self, degrees):
"""
Rotates counter-clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('ccw %s' % degrees)
def flip(self, direction):
"""
Flips.
Args:
direction (str): Direction to flip, 'l', 'r', 'f', 'b'.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('flip %s' % direction)
def get_response(self):
"""
Returns response of tello.
Returns:
int: response of tello.
"""
response = self.response
return response
def get_height(self):
"""Returns height(dm) of tello.
Returns:
int: Height(dm) of tello.
"""
height = self.send_command('height?')
height = str(height)
height = filter(str.isdigit, height)
try:
height = int(height)
self.last_height = height
except:
height = self.last_height
pass
return height
def get_battery(self):
"""Returns percent battery life remaining.
Returns:
int: Percent battery life remaining.
"""
battery = self.send_command('battery?')
try:
battery = int(battery)
except:
pass
return battery
def get_flight_time(self):
"""Returns the number of seconds elapsed during flight.
Returns:
int: Seconds elapsed during flight.
"""
flight_time = self.send_command('time?')
try:
flight_time = int(flight_time)
except:
pass
return flight_time
def get_speed(self):
"""Returns the current speed.
Returns:
int: Current speed in KPH or MPH.
"""
speed = self.send_command('speed?')
try:
speed = float(speed)
if self.imperial is True:
speed = round((speed / 44.704), 1)
else:
speed = round((speed / 27.7778), 1)
except:
pass
return speed
def land(self):
"""Initiates landing.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('land')
def move(self, direction, distance):
"""Moves in a direction for a distance.
This method expects meters or feet. The Tello API expects distances
from 20 to 500 centimeters.
Metric: .02 to 5 meters
Imperial: .7 to 16.4 feet
Args:
direction (str): Direction to move, 'forward', 'back', 'right' or 'left'.
distance (int|float): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
distance = float(distance)
if self.imperial is True:
distance = int(round(distance * 30.48))
else:
distance = int(round(distance * 100))
return self.send_command('%s %s' % (direction, distance))
def move_backward(self, distance):
"""Moves backward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('back', distance)
def move_down(self, distance):
"""Moves down for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('down', distance)
def move_forward(self, distance):
"""Moves forward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('forward', distance)
def move_left(self, distance):
"""Moves left for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('left', distance)
def move_right(self, distance):
"""Moves right for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
"""
return self.move('right', distance)
def move_up(self, distance):
"""Moves up for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('up', distance)
pass #End of Class Tello
반응형