Скрипт Aruco_Flight¶
Скрипт Aruco_flight служит для изучения в Python: многопроцессорности, работы с буфером данных, функции распознавания Aruco-маркеров и работы с ними, а также с методами библиотеки pioneer_sdk. Перед выполнением скрипта необходимо произвести калибровку камеры c помощью скрипта Camera_calibration.
Ссылка на github: https://github.com/geoscan/pioneer_sdk/tree/master/camera_calibration
Как работает алгоритм?¶
Для корректной работы одновременного отслеживания маркера и перемещения дрона необходимо разделить две эти задачи на два отдельных процесса. В основном потоке создаём 2 параллельных процесса, которые общаются между собой через буфер. Один процесс отвечает за обнаружение маркера и чтение необходимой информации, а другой за перемещение квадрокоптера. Процесс pos_and_orient – отвечает за работу с маркером а drone_flight – за управление квадрокоптером.
При нажатии клавиши Escape, которое отлеживается в Producer процессе, буфер закрывается, следовательно завершается обмен данными и коптер заходит на посадку.
Расчётная часть алгоритма.¶
Для следования квадрокоптера за маркером необходимо учитывать несколько систем координат: от начальной точки взлёта, текущее положение квадрокоптера, положение камеры, положение самого маркера, а также проекция от него, чтобы держаться на определённом растяни от маркера.
x_camera = t_vec.item(0) |
y_camera = t_vec.item(1) |
z_camera = t_vec.item(2) |
yaw_camera = math.radians(r_vec_euler.item(1)) |
c_yaw_prev = math.cos(command_yaw) |
s_yaw_prev = math.sin(command_yaw) |
command_yaw -= k_p_yaw * yaw_camera |
c_yaw_comm = math.cos(command_yaw) |
s_yaw_comm = math.sin(command_yaw) |
command_x += k_p_xy*(x_camera*c_yaw_prev-z_camera*s_yaw_prev+distance*s_yaw_comm) |
command_y += k_p_xy*(x_camera * s_yaw_prev + z_camera * c_yaw_prev - distance * c_yaw_comm) |
Разбор скрипта¶
Импортируем необходимые библиотеки и определяем их назначение:
from pioneer_sdk import Pioneer, Camera – класс Pioneer из pioneer_sdk, отвечающий за взаимодействие с коптером и класс Camera для работы c камерой; NumPy – библиотека для работы с массивами данных; Cv2 – библиотека машинного зрения;import cv2 from pioneer_sdk import Pioneer, Camera import numpy as np
Создаём функцию load_coefficients(path) для работы с параметрами камеры.
def load_coefficients(path): cv_file = cv2.FileStorage(path, cv2.FILE_STORAGE_READ) camera_matrix = cv_file.getNode("mtx").mat() dist_coeffs = cv_file.getNode("dist").mat() cv_file.release() return camera_matrix, dist_coeffs
Далее используем конструкцию if __name__ == „__main__“:, которая является точкой входа в программу. Всё, что идёт до этого условия, выполнятся всегда: и при вызове в качестве модуля и при вызове, как исполняемый файл. Подробное описание данной конструкции
В переменную camera_frame передаём изображение от квадрокоптера:
cv2.imdecode(buf, flag) – чтение изображения из указного массива, где:buf – читаемый массив;flag – тип изображения. np.frombuffer(buffer,dtype) - интерпретирует буфер как одномерный массив, где:buffer - буфер-подобный объект;dtype - тип данных NumPy. pioneer_sdk.get_raw_video_frame() – возвращает массив байт представляющий собой jpg картинку.Выходит, следующая строчка:
camera_frame = cv2.imdecode(np.frombuffer(drone.get_raw_video_frame(), dtype=np.uint8), cv2.IMREAD_COLOR)
Переводим изображение в чёрно белый формат. Создаём маску.
gray = cv2.cvtColor(camera_frame, cv2.COLOR_BGR2GRAY)И возвращаем 3 значение из метода обнаружения маркеров aruco.detectMarkers(gray, aruco_dict, patameters=aruco_parameters), где:
corners – углы;ids – номер маркера;rejected_ing_points – наличие лишних тэгов.corners, ids, rejected_img_points = aruco.detectMarkers(gray, aruco_dict, parameters=aruco_parameters)
Условием if np.all(ids is not None): проверяем есть ли в аргументе ids элементы в значении True, т.е. видим ли мы тэг:
if np.all(ids is not None):
Получаем оси системы координат, связанной с центром маркера, и выводим их в две переменные: r_vec_rodrigues и t_vec
r_vec_rodrigues, t_vec, _ = aruco.estimatePoseSingleMarkers(corners, size_of_marker, camera_mtx,camera_dist)
Рисуем на изображении полученные оси системы координат, связанной с центром маркера и обводим маркер:
aruco.drawAxis(camera_frame, camera_mtx, camera_dist, r_vec_rodrigues, t_vec, 0.01) aruco.drawDetectedMarkers(camera_frame, corners)
Приводим полученный вектор ориентации к вектору углов Эйлера:
r_mat = cv2.Rodrigues(r_vec_rodrigues)[0] p = np.hstack((r_mat.reshape(3, 3), t_vec.reshape(3, 1))) r_vec_euler = cv2.decomposeProjectionMatrix(p)[6]
Так как очередь используется из одного элемента, то мы проверяем ее на заполненность, если да, то стираем старый элемент и записываем новый:
if buff.full(): buff.get() buff.put([t_vec, r_vec_euler])
Если обнаружения тэга не происходит, то просто продолжаем выполнение процесса:
except cv2.error: continue
Выводим изображение:
cv2.imshow('marker_detection', camera_frame)
Создаём обработку нажатия клавиш и нажатии на Esc закрываем окно с изображением, закрываем буфер аргументом ‘end’ и выходим из функции.
key = cv2.waitKey(1) if key == 27: # esc print('esc pressed') cv2.destroyAllWindows() if buff.full(): buff.get() buff.put(['end']) break
Создаём функцию перемещения квадрокоптера и ряд переменных:
def drone_control(buff, drone): command_x = float(0) command_y = float(0) command_z = float(1) # initial flight height command_yaw = math.radians(float(0)) k_p_xy = 0.6 k_p_z = 0.6 k_p_yaw = 0.4 distance = 0.5 t_vec = None r_vec_euler = None new_point = True new_message = True p_r = False
Далее запускаем бесконечный цикл, в котором задаём ряд условий.
while True:
Если получаем новую точку или новое сообщение, то перемещаемся в актуальные точки и сбрасываем флаги.
if new_point and new_message: drone.go_to_local_point(x=command_x, y=command_y, z=command_z, yaw=command_yaw) new_point = False new_message = False
Если в буфере есть какое-либо значение, то получаем его. Если длинна сообщения 1 и его содержимое ‘end’, то завершаем процесс. В противном случае переменой t_vec и r_vec_euler присваиваем матрицы получение из буфера. Выставляем флаг new_massage.
if not buff.empty(): message = buff.get() if len(message) == 1 and message[0] == 'end': break t_vec = message[0] r_vec_euler = message[1] new_message = True
Если достигнута точка, то выставляем флаг на перерасчёт координат.
if drone.point_reached(): p_r = True
Если сработал флаг на перерасчёт координат и t_vec с t_vec_euler не пустые, то происходит расчёт координат:
if p_r and t_vec is not None and r_vec_euler is not None: x_camera = t_vec.item(0) y_camera = t_vec.item(1) z_camera = t_vec.item(2) yaw_camera = math.radians(r_vec_euler.item(1)) c_yaw_prev = math.cos(command_yaw) s_yaw_prev = math.sin(command_yaw) command_yaw -= k_p_yaw * yaw_camera c_yaw_comm = math.cos(command_yaw) s_yaw_comm = math.sin(command_yaw) command_x += k_p_xy*(x_camera*c_yaw_prev-z_camera*s_yaw_prev+distance*s_yaw_comm) command_y += k_p_xy*(x_camera * s_yaw_prev + z_camera * c_yaw_prev - distance * c_yaw_comm) command_z -= k_p_z * y_camera new_point = True p_r = False
Далее используем конструкцию if __name__ == „__main__“:, которая является точкой входа в программу, и сообщаем о запуске скрипта. Всё, что идёт до этого условия, выполнятся всегда: и при вызове в качестве модуля и при вызове, как исполняемый файл.
if __name__ == '__main__': print('start')Подробное описание данной конструкции: https://docs.python.org/3/library/__main__.html
Внутри конструкции try загружаем калибровочный файл yaml, заранее созданный в скрипте get_camera_samples.
try: # change if calibration_matrix.yaml file is located in not default location calibration_file = open(os.path.join(os.getcwd(), '..', "camera_calibration", "result", "calibration_matrix.yaml")) parsed_calibration_file = yaml.load(calibration_file, Loader=yaml.FullLoader) mtx = np.array(parsed_calibration_file.get("camera_matrix")) dist = np.array(parsed_calibration_file.get("dist_coeff"))
Если калибровочного файла нет, то пишем о необходимости произвести калибровку и завершаем выполнение скрипта:
except FileNotFoundError: print('Сan not find calibration data, please run get_camera_samples.py script from camera calibration folder') sys.exit(0)
Необходимо воспользоваться конструкцией manager, т.к. просто экземпляр класса нельзя передавать в буфере. Помимо этого, запускаем моторы и взлетаем.
BaseManager.register('Pioneer', Pioneer) manager = BaseManager() manager.start() pioneer_mini = manager.Pioneer() pioneer_mini.arm() pioneer_mini.takeoff()
Создаём буфер обмена данными и мульти процессы, исполнительными функциями которых являются ранее созданные image_proc и drone_control запускам их.
buffer = mp.Queue(maxsize=1) pos_and_orient = mp.Process(target=image_proc, args=(buffer, pioneer_mini, mtx, dist)) drone_flight = mp.Process(target=drone_control, args=(buffer, pioneer_mini)) pos_and_orient.start() drone_flight.start()
Как только выполнение процессов завершено, то они прикрепляются обратно к основному потоку для их корректного закрытия:
pos_and_orient.join() drone_flight.join() pioneer_mini.land()