카테고리 없음

Tello Drone Python UI 프로그램

Wood Pecker 2021. 8. 3. 13:33

1. Python GUI 프로그램

기존 2.x 버전의 SDK 예제 GUI 프로그램을 3.x로 변경한 것이다. 이 프로그램은 라이브러리 문제와 문법 차이가 있어 바로 3.x 파이썬에서 실행되지 않는다. 3.x 버전의 GUI 예제 프로그램을 검색으로 찾지를 못하여 기존 2.x 버전의 GUI 프로그램을 3.x로 수정 변경하였다.   [참고]https://github.com/damiafuentes/DJITelloPy

 

2. main.py

import tello
from tello_control_ui import TelloUI

def main():
    drone = tello.Tello('0.0.0.0', 8889)
    vplayer = TelloUI(drone,"./img/")
    # start the Tkinter mainloop
    vplayer.root.mainloop() 

if __name__ == "__main__":
    main()

 

3. tello_control_ui.py

#!/usr/bin/python3
import os
from PIL import Image
from PIL import ImageTk
#import Tkinter as tki  ==> import tkinter as tki
import tkinter as tki

#from Tkinter import Toplevel, Scale ==>from tkinter import Toplevel, Scale
from tkinter import Toplevel, Scale

import threading
import datetime
import cv2  # pip3 install opencv-python --upgrade --force-reinstall
import time
import platform

class TelloUI:
    def __init__(self,tello,outputpath):
        """
        Initial all the element of the GUI,support by Tkinter
        :param tello: class interacts with the Tello drone.
        Raises:
            RuntimeError: If the Tello rejects the attempt to enter command mode.
        """        
        self.tello = tello # videostream device
        self.outputPath = outputpath # the path that save pictures created by clicking the takeSnapshot button
        #self.frame = None  # frame read from h264decoder and used for pose recognition
        self.frame = cv2.imread('tello.png')

        self.thread = None # thread of the Tkinter mainloop
        self.stopEvent = None  

        # control variables
        self.distance = 0.1  # default distance for 'move' cmd
        self.degree = 30  # default degree for 'cw' or 'ccw' cmd

        # if the flag is TRUE,the auto-takeoff thread will stop waiting for the response from tello
        self.quit_waiting_flag = False

        # initialize the root window and image panel
        self.root = tki.Tk()
        self.panel = None

        # create buttons
        self.btn_snapshot = tki.Button(self.root, text="Snapshot!", command=self.takeSnapshot)
        self.btn_snapshot.pack(side="bottom", fill="both",  expand="yes", padx=10, pady=5)
        self.btn_pause = tki.Button(self.root, text="Pause", relief="raised", command=self.pauseVideo)
        self.btn_pause.pack(side="bottom", fill="both", expand="yes", padx=10, pady=5)
        self.btn_landing = tki.Button(self.root, text="Open Command Panel", relief="raised", command=self.openCmdWindow)
        self.btn_landing.pack(side="bottom", fill="both",  expand="yes", padx=10, pady=5)

        # start a thread that constantly pools the video sensor for
        # the most recently read frame
        self.stopEvent = threading.Event()
        self.thread = threading.Thread(target=self.videoLoop, args=())
        self.thread.start()

        # set a callback to handle when the window is closed
        self.root.wm_title("TELLO Controller")
        self.root.wm_protocol("WM_DELETE_WINDOW", self.onClose)

        # the sending_command will send command to tello every 5 seconds
        self.sending_command_thread = threading.Thread(target = self._sendingCommand)

    def videoLoop(self):
        """
        The mainloop thread of Tkinter 
        Raises:
            RuntimeError: To get around a RunTime error that Tkinter throws due to threading.
        """
        try:
            # start the thread that get GUI image and drwa skeleton 
            time.sleep(0.5)
            self.sending_command_thread.start()
            system = platform.system()
            while not self.stopEvent.is_set():
                # read the frame for GUI show
                self.frame = self.tello.read()
                # we found compatibility problem between Tkinter,PIL and Macos,and it will
                # sometimes result the very long preriod of the "ImageTk.PhotoImage" function,
                # so for Macos,we start a new thread to execute the _updateGUIImage function.
                if self.frame is None or self.frame.size == 0:
                    #print("videoLoop self.frame is None")
                    # transfer the format from frame to image
                    continue
                else:
                    image = Image.fromarray(self.frame)

                if system =="Windows" or system =="Linux":
                    self._updateGUIImage(image)
                else:
                    thread_tmp = threading.Thread(target=self._updateGUIImage,args=(image,))
                    thread_tmp.start()
                    time.sleep(0.03)                                                            
        except Exception as e:
            print("[INFO] caught a RuntimeError"+str(e))


    def _updateGUIImage(self,image):
        """
        Main operation to initial the object of image,and update the GUI panel 
        """
        if image is None:
            return

        image = ImageTk.PhotoImage(image)
        # if the panel none ,we need to initial it
        if self.panel is None:
            self.panel = tki.Label(image=image)
            self.panel.image = image
            self.panel.pack(side="left", padx=10, pady=10)
        # otherwise, simply update the panel
        else:
            self.panel.configure(image=image)
            self.panel.image = image

    def _sendingCommand(self):
        """
        start a while loop that sends 'command' to tello every 5 second
        """
        loopTurn=True
        if loopTurn:
            return
        while True:
            if loopTurn:
                self.tello.send_command('command')
            else:
                self.tello.send_command('command')
            loopTurn= not loopTurn
            time.sleep(10)
            pass

    def _setQuitWaitingFlag(self):  
        """
        set the variable as TRUE,it will stop computer waiting for response from tello  
        """       
        self.quit_waiting_flag = True        

    def openCmdWindow(self):
        """
        open the cmd window and initial all the button and text
        """        
        panel = Toplevel(self.root)
        panel.wm_title("Command Panel")

        # create text input entry
        text0 = tki.Label(panel,
                          text='This Controller map keyboard inputs to Tello control commands\n'
                               'Adjust the trackbar to reset distance and degree parameter',
                          font='Helvetica 10 bold'
                          )
        text0.pack(side='top')

        text1 = tki.Label(panel, text=
                          'W - Move Tello Up\t\t\tArrow Up - Move Tello Forward\n'
                          'S - Move Tello Down\t\t\tArrow Down - Move Tello Backward\n'
                          'A - Rotate Tello Counter-Clockwise\tArrow Left - Move Tello Left\n'
                          'D - Rotate Tello Clockwise\t\tArrow Right - Move Tello Right',
                          justify="left")
        text1.pack(side="top")

        self.btn_landing = tki.Button(
            panel, text="Land", relief="raised", command=self.telloLanding)
        self.btn_landing.pack(side="bottom", fill="both",
                              expand="yes", padx=10, pady=5)

        self.btn_takeoff = tki.Button(
            panel, text="Takeoff", relief="raised", command=self.telloTakeOff)
        self.btn_takeoff.pack(side="bottom", fill="both",
                              expand="yes", padx=10, pady=5)

        # binding arrow keys to drone control
        self.tmp_f = tki.Frame(panel, width=100, height=2)
        self.tmp_f.bind('<KeyPress-w>', self.on_keypress_w)
        self.tmp_f.bind('<KeyPress-s>', self.on_keypress_s)
        self.tmp_f.bind('<KeyPress-a>', self.on_keypress_a)
        self.tmp_f.bind('<KeyPress-d>', self.on_keypress_d)
        self.tmp_f.bind('<KeyPress-Up>', self.on_keypress_up)
        self.tmp_f.bind('<KeyPress-Down>', self.on_keypress_down)
        self.tmp_f.bind('<KeyPress-Left>', self.on_keypress_left)
        self.tmp_f.bind('<KeyPress-Right>', self.on_keypress_right)
        self.tmp_f.pack(side="bottom")
        self.tmp_f.focus_set()

        self.btn_landing = tki.Button(
            panel, text="Flip", relief="raised", command=self.openFlipWindow)
        self.btn_landing.pack(side="bottom", fill="both",
                              expand="yes", padx=10, pady=5)

        self.distance_bar = Scale(panel, from_=0.02, to=5, tickinterval=0.01, digits=3, label='Distance(m)',
                                  resolution=0.01)
        self.distance_bar.set(0.2)
        self.distance_bar.pack(side="left")

        self.btn_distance = tki.Button(panel, text="Reset Distance", relief="raised",
                                       command=self.updateDistancebar,
                                       )
        self.btn_distance.pack(side="left", fill="both",
                               expand="yes", padx=10, pady=5)

        self.degree_bar = Scale(panel, from_=1, to=360, tickinterval=10, label='Degree')
        self.degree_bar.set(30)
        self.degree_bar.pack(side="right")

        self.btn_distance = tki.Button(panel, text="Reset Degree", relief="raised", command=self.updateDegreebar)
        self.btn_distance.pack(side="right", fill="both",  expand="yes", padx=10, pady=5)

    def openFlipWindow(self):
        """
        open the flip window and initial all the button and text
        """
        panel = Toplevel(self.root)
        panel.wm_title("Gesture Recognition")

        self.btn_flipl = tki.Button(
            panel, text="Flip Left", relief="raised", command=self.telloFlip_l)
        self.btn_flipl.pack(side="bottom", fill="both",
                            expand="yes", padx=10, pady=5)

        self.btn_flipr = tki.Button(
            panel, text="Flip Right", relief="raised", command=self.telloFlip_r)
        self.btn_flipr.pack(side="bottom", fill="both",
                            expand="yes", padx=10, pady=5)

        self.btn_flipf = tki.Button(
            panel, text="Flip Forward", relief="raised", command=self.telloFlip_f)
        self.btn_flipf.pack(side="bottom", fill="both",
                            expand="yes", padx=10, pady=5)

        self.btn_flipb = tki.Button(
            panel, text="Flip Backward", relief="raised", command=self.telloFlip_b)
        self.btn_flipb.pack(side="bottom", fill="both",
                            expand="yes", padx=10, pady=5)

    def takeSnapshot(self):
        """
        save the current frame of the video as a jpg file and put it into outputpath
        """
        print( self.tello.get_battery() )
        # grab the current timestamp and use it to construct the filename
        ts = datetime.datetime.now()
        filename = "./img/{}.jpg".format(ts.strftime("%Y-%m-%d_%H-%M-%S"))
        # save the file
        self.frame = self.tello.read()
        if self.frame is None or self.frame.size == 0:
            print("takeSnapshot self.frame is None")
            self.frame = cv2.imread('tello.png')
        cv2.imwrite(filename, cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR))
        print("[INFO] saved {}".format(filename))

    def pauseVideo(self):
        """
        Toggle the freeze/unfreze of video
        """
        if self.btn_pause.config('relief')[-1] == 'sunken':
            self.btn_pause.config(relief="raised")
            self.tello.video_freeze(False)
        else:
            self.btn_pause.config(relief="sunken")
            self.tello.video_freeze(True)

    def telloTakeOff(self):
        return self.tello.takeoff()                

    def telloLanding(self):
        return self.tello.land()

    def telloFlip_l(self):
        return self.tello.flip('l')

    def telloFlip_r(self):
        return self.tello.flip('r')

    def telloFlip_f(self):
        return self.tello.flip('f')

    def telloFlip_b(self):
        return self.tello.flip('b')

    def telloCW(self, degree):
        return self.tello.rotate_cw(degree)

    def telloCCW(self, degree):
        return self.tello.rotate_ccw(degree)

    def telloMoveForward(self, distance):
        return self.tello.move_forward(distance)

    def telloMoveBackward(self, distance):
        return self.tello.move_backward(distance)

    def telloMoveLeft(self, distance):
        return self.tello.move_left(distance)

    def telloMoveRight(self, distance):
        return self.tello.move_right(distance)

    def telloUp(self, dist):
        return self.tello.move_up(dist)

    def telloDown(self, dist):
        return self.tello.move_down(dist)

    def updateTrackBar(self):
        self.my_tello_hand.setThr(self.hand_thr_bar.get())

    def updateDistancebar(self):
        self.distance = self.distance_bar.get()
        print ('reset distance to %.1f' % self.distance)

    def updateDegreebar(self):
        self.degree = self.degree_bar.get()
        print ('reset distance to %d' % self.degree)

    def on_keypress_w(self, event):
        print ("up %d m" % self.distance)
        self.telloUp(self.distance)

    def on_keypress_s(self, event):
        print( "down %d m" % self.distance)
        self.telloDown(self.distance)

    def on_keypress_a(self, event):
        print( "ccw %d degree" % self.degree)
        self.tello.rotate_ccw(self.degree)

    def on_keypress_d(self, event):
        print ("cw %d m" % self.degree)
        self.tello.rotate_cw(self.degree)

    def on_keypress_up(self, event):
        print("forward %d m" % self.distance)
        self.telloMoveForward(self.distance)

    def on_keypress_down(self, event):
        print ("backward %d m" % self.distance)
        self.telloMoveBackward(self.distance)

    def on_keypress_left(self, event):
        print ("left %d m" % self.distance)
        self.telloMoveLeft(self.distance)

    def on_keypress_right(self, event):
        print ("right %d m" % self.distance)
        self.telloMoveRight(self.distance)

    def on_keypress_enter(self, event):
        if self.frame is not None:
            self.registerFace()
        self.tmp_f.focus_set()

    def onClose(self):
        print("[INFO] closing...")
        self.stopEvent.set()
        del self.tello
        self.root.quit()
pass #End of Class TelloUI

 

4. tello.py

#!/usr/bin/python3
import socket
import threading
import time
import numpy as np
import cv2

## [참고] https://github.com/damiafuentes/DJITelloPy
class Tello:
    def __init__(self, local_ip="0.0.0.0", local_command_port=8889, imperial=False, command_timeout=.3,
                 tello_ip='192.168.10.1',  tello_port=8889):
        self.abort_flag = False
        self.command_timeout = command_timeout
        self.imperial = imperial
        self.response = None
        self.frame = cv2.imread('tello.png')  # 기본 이미지  numpy array BGR -- current camera output frame
        self.last_frame = self.frame          # numpy array BGR -- current camera output frame
        self.is_freeze = False  # freeze current camera output
        udp = socket.getprotobyname('udp')
        self.socket_command = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,udp)  # socket for receiving video stream
        self.socket_video = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,udp)  # socket for receiving video stream
        self.tello_address = (tello_ip, tello_port)

        self.local_video_port = 11111  # port for receiving video stream
        self.last_height = 0
        self.socket_command.bind((local_ip, local_command_port))

        # thread for receiving cmd
        self.receive_command_thread = threading.Thread(target=self._receive_command_thread)
        self.receive_command_thread.daemon = True
        self.receive_command_thread.start()

        # thread for receiving video
        self.receive_video_thread = threading.Thread(target=self._receive_video_thread)
        self.receive_video_thread.daemon = True
        self.receive_video_thread.start()

        # to receive video -- send cmd: command, streamon
        self.socket_command.sendto(b'command', self.tello_address)
        print('sent: command')
        time.sleep(1)
        self.socket_command.sendto(b'streamon', self.tello_address)
        print('sent: streamon')

    def __del__(self):
        self.socket_command.close()

    def read(self):
        if self.is_freeze:
            return self.last_frame
        else:
            return self.frame

    def video_freeze(self, is_freeze=True):
        self.is_freeze = is_freeze
        if is_freeze:
            self.last_frame = self.frame

    def _receive_command_thread(self):
        while True:
            try:
                self.response, ip = self.socket_command.recvfrom(3000)
            except socket.error as exc:
                print ("Caught exception socket.error : %s" % exc)

    def _receive_video_thread(self):
        """
        Listens for video streaming (raw h264) from the Tello.
        Runs as a thread, sets self.frame to the most recent frame Tello captured.
        """
        self.video = cv2.VideoCapture('udp://@0.0.0.0:11111', cv2.CAP_FFMPEG)
        time.sleep(3)
        while True:
            if not self.video.isOpened():
                self.video.open('udp://0.0.0.0:11111')
                time.sleep(0.5)
            else:
                break

        while True:
            try:
                if not self.video.isOpened():
                    print('Try to open video stream....')
                    self.video.open('udp://0.0.0.0:11111')
                    time.sleep(0.5)
                    continue
                ret, frame = self.video.read()
                if ret:
                    self.frame=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    #self.frame= frame  ## BGR
            except Exception as err:
                print(err)

    def send_command(self, command):
        print (">> send cmd: {}".format(command))
        self.abort_flag = False
        timer = threading.Timer(self.command_timeout, self.set_abort_flag)
        self.socket_command.sendto(command.encode('utf-8'), self.tello_address)
        timer.start()
        while self.response is None:
            if self.abort_flag is True:
                break
        timer.cancel()
        if self.response is None:
            response = 'none_response'
        else:
            response = self.response.decode('utf-8')
        self.response = None
        return response

    def set_abort_flag(self):
        self.abort_flag = True
        pass

    def takeoff(self):
        return self.send_command('takeoff')

    def set_speed(self, speed):
        """
        Sets speed.
        This method expects KPH or MPH. The Tello API expects speeds from 1 to 100 centimeters/second.
        Metric: .1 to 3.6 KPH
        Imperial: .1 to 2.2 MPH
        Args:
            speed (int|float): Speed.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        speed = float(speed)
        if self.imperial is True:
            speed = int(round(speed * 44.704))
        else:
            speed = int(round(speed * 27.7778))

        return self.send_command('speed %s' % speed)

    def rotate_cw(self, degrees):
        """
        Rotates clockwise.
        Args:
            degrees (int): Degrees to rotate, 1 to 360.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.send_command('cw %s' % degrees)

    def rotate_ccw(self, degrees):
        """
        Rotates counter-clockwise.
        Args:
            degrees (int): Degrees to rotate, 1 to 360.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.send_command('ccw %s' % degrees)

    def flip(self, direction):
        """
        Flips.
        Args:
            direction (str): Direction to flip, 'l', 'r', 'f', 'b'.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('flip %s' % direction)

    def get_response(self):
        """
        Returns response of tello.
        Returns:
            int: response of tello.
        """
        response = self.response
        return response

    def get_height(self):
        """Returns height(dm) of tello.
        Returns:
            int: Height(dm) of tello.
        """
        height = self.send_command('height?')
        height = str(height)
        height = filter(str.isdigit, height)
        try:
            height = int(height)
            self.last_height = height
        except:
            height = self.last_height
            pass
        return height

    def get_battery(self):
        """Returns percent battery life remaining.
        Returns:
            int: Percent battery life remaining.
        """
        battery = self.send_command('battery?')
        try:
            battery = int(battery)
        except:
            pass
        return battery

    def get_flight_time(self):
        """Returns the number of seconds elapsed during flight.
        Returns:
            int: Seconds elapsed during flight.
        """
        flight_time = self.send_command('time?')
        try:
            flight_time = int(flight_time)
        except:
            pass
        return flight_time

    def get_speed(self):
        """Returns the current speed.
        Returns:
            int: Current speed in KPH or MPH.
        """
        speed = self.send_command('speed?')
        try:
            speed = float(speed)
            if self.imperial is True:
                speed = round((speed / 44.704), 1)
            else:
                speed = round((speed / 27.7778), 1)
        except:
            pass
        return speed

    def land(self):
        """Initiates landing.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.send_command('land')

    def move(self, direction, distance):
        """Moves in a direction for a distance.
        This method expects meters or feet. The Tello API expects distances
        from 20 to 500 centimeters.
        Metric: .02 to 5 meters
        Imperial: .7 to 16.4 feet
        Args:
            direction (str): Direction to move, 'forward', 'back', 'right' or 'left'.
            distance (int|float): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        distance = float(distance)
        if self.imperial is True:
            distance = int(round(distance * 30.48))
        else:
            distance = int(round(distance * 100))

        return self.send_command('%s %s' % (direction, distance))

    def move_backward(self, distance):
        """Moves backward for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """

        return self.move('back', distance)

    def move_down(self, distance):
        """Moves down for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('down', distance)

    def move_forward(self, distance):
        """Moves forward for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('forward', distance)

    def move_left(self, distance):
        """Moves left for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('left', distance)

    def move_right(self, distance):
        """Moves right for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        """
        return self.move('right', distance)

    def move_up(self, distance):
        """Moves up for a distance.
        See comments for Tello.move().
        Args:
            distance (int): Distance to move.
        Returns:
            str: Response from Tello, 'OK' or 'FALSE'.
        """
        return self.move('up', distance)
pass #End of Class Tello
반응형