Скрипт Aruco_Flight

Скрипт Aruco_flight служит для изучения в Python: многопроцессорности, работы с буфером данных, функции распознавания Aruco-маркеров и работы с ними, а также с методами библиотеки pioneer_sdk. Перед выполнением скрипта необходимо произвести калибровку камеры c помощью скрипта Camera_calibration.

Ссылка на github: https://github.com/geoscan/pioneer_sdk/tree/master/camera_calibration

Как работает алгоритм?

Для корректной работы одновременного отслеживания маркера и перемещения дрона необходимо разделить две эти задачи на два отдельных процесса. В основном потоке создаём 2 параллельных процесса, которые общаются между собой через буфер. Один процесс отвечает за обнаружение маркера и чтение необходимой информации, а другой за перемещение квадрокоптера. Процесс pos_and_orient – отвечает за работу с маркером а drone_flight – за управление квадрокоптером.

https://storage.yandexcloud.net/pioneer-doc.geoscan.ru-static/images/programming/python/pio_sdk/image1.PNG

Рисунок 1 - Принципиальная схема работы алгоритма

При нажатии клавиши Escape, которое отлеживается в Producer процессе, буфер закрывается, следовательно завершается обмен данными и коптер заходит на посадку.

https://storage.yandexcloud.net/pioneer-doc.geoscan.ru-static/images/programming/python/pio_sdk/image2.PNG

Рисунок 2 - Распределение потоков

Расчётная часть алгоритма.

Для следования квадрокоптера за маркером необходимо учитывать несколько систем координат: от начальной точки взлёта, текущее положение квадрокоптера, положение камеры, положение самого маркера, а также проекция от него, чтобы держаться на определённом растяни от маркера.

https://storage.yandexcloud.net/pioneer-doc.geoscan.ru-static/images/programming/python/pio_sdk/image3.PNG

Рисунок 3 - Оси квадрокоптера и маркера

x_camera = t_vec.item(0)

y_camera = t_vec.item(1)

z_camera = t_vec.item(2)

yaw_camera = math.radians(r_vec_euler.item(1))

c_yaw_prev = math.cos(command_yaw)

s_yaw_prev = math.sin(command_yaw)

command_yaw -= k_p_yaw * yaw_camera

c_yaw_comm = math.cos(command_yaw)

s_yaw_comm = math.sin(command_yaw)

command_x += k_p_xy*(x_camera*c_yaw_prev-z_camera*s_yaw_prev+distance*s_yaw_comm)

command_y += k_p_xy*(x_camera * s_yaw_prev + z_camera * c_yaw_prev - distance * c_yaw_comm)

Разбор скрипта

  1. Импортируем необходимые библиотеки и определяем их назначение:

 import cv2
 from pioneer_sdk import Pioneer, Camera
 import numpy as np
  1. Создаём функцию load_coefficients(path) для работы с параметрами камеры.

def load_coefficients(path):
 cv_file = cv2.FileStorage(path, cv2.FILE_STORAGE_READ)

 camera_matrix = cv_file.getNode("mtx").mat()
 dist_coeffs = cv_file.getNode("dist").mat()

 cv_file.release()
 return camera_matrix, dist_coeffs
  1. Далее используем конструкцию if __name__ == „__main__“:, которая является точкой входа в программу. Всё, что идёт до этого условия, выполнятся всегда: и при вызове в качестве модуля и при вызове, как исполняемый файл. Подробное описание данной конструкции

  2. В переменную camera_frame передаём изображение от квадрокоптера:

  • cv2.imdecode(buf, flag) – чтение изображения из указного массива, где:
    buf – читаемый массив;
    flag – тип изображения.
  • np.frombuffer(buffer,dtype) - интерпретирует буфер как одномерный массив, где:
    buffer - буфер-подобный объект;
    dtype - тип данных NumPy.
  • pioneer_sdk.get_raw_video_frame() – возвращает массив байт представляющий собой jpg картинку.

Выходит, следующая строчка:

camera_frame = cv2.imdecode(np.frombuffer(drone.get_raw_video_frame(), dtype=np.uint8), cv2.IMREAD_COLOR)
  1. Переводим изображение в чёрно белый формат. Создаём маску.

gray = cv2.cvtColor(camera_frame, cv2.COLOR_BGR2GRAY)

И возвращаем 3 значение из метода обнаружения маркеров aruco.detectMarkers(gray, aruco_dict, patameters=aruco_parameters), где:

corners – углы;
ids – номер маркера;
rejected_ing_points – наличие лишних тэгов.
corners, ids, rejected_img_points = aruco.detectMarkers(gray, aruco_dict, parameters=aruco_parameters)
  1. Условием if np.all(ids is not None): проверяем есть ли в аргументе ids элементы в значении True, т.е. видим ли мы тэг:

if np.all(ids is not None):
  1. Получаем оси системы координат, связанной с центром маркера, и выводим их в две переменные: r_vec_rodrigues и t_vec

r_vec_rodrigues, t_vec, _ = aruco.estimatePoseSingleMarkers(corners, size_of_marker, camera_mtx,camera_dist)
  1. Рисуем на изображении полученные оси системы координат, связанной с центром маркера и обводим маркер:

aruco.drawAxis(camera_frame, camera_mtx, camera_dist, r_vec_rodrigues, t_vec, 0.01)
aruco.drawDetectedMarkers(camera_frame, corners)
  1. Приводим полученный вектор ориентации к вектору углов Эйлера:

r_mat = cv2.Rodrigues(r_vec_rodrigues)[0]
p = np.hstack((r_mat.reshape(3, 3), t_vec.reshape(3, 1)))
r_vec_euler = cv2.decomposeProjectionMatrix(p)[6]
  1. Так как очередь используется из одного элемента, то мы проверяем ее на заполненность, если да, то стираем старый элемент и записываем новый:

if buff.full():
buff.get()
buff.put([t_vec, r_vec_euler])
  1. Если обнаружения тэга не происходит, то просто продолжаем выполнение процесса:

except cv2.error:
  continue
  1. Выводим изображение:

cv2.imshow('marker_detection', camera_frame)
  1. Создаём обработку нажатия клавиш и нажатии на Esc закрываем окно с изображением, закрываем буфер аргументом ‘end’ и выходим из функции.

key = cv2.waitKey(1)

if key == 27:  # esc
    print('esc pressed')
    cv2.destroyAllWindows()
    if buff.full():
        buff.get()
    buff.put(['end'])
    break
  1. Создаём функцию перемещения квадрокоптера и ряд переменных:

def drone_control(buff, drone):
  command_x = float(0)
  command_y = float(0)
  command_z = float(1)  # initial flight height
  command_yaw = math.radians(float(0))

  k_p_xy = 0.6
  k_p_z = 0.6
  k_p_yaw = 0.4
  distance = 0.5

  t_vec = None
  r_vec_euler = None

  new_point = True
  new_message = True

  p_r = False
  1. Далее запускаем бесконечный цикл, в котором задаём ряд условий.

while True:
  1. Если получаем новую точку или новое сообщение, то перемещаемся в актуальные точки и сбрасываем флаги.

if new_point and new_message:
  drone.go_to_local_point(x=command_x, y=command_y, z=command_z, yaw=command_yaw)
  new_point = False
  new_message = False
  1. Если в буфере есть какое-либо значение, то получаем его. Если длинна сообщения 1 и его содержимое ‘end’, то завершаем процесс. В противном случае переменой t_vec и r_vec_euler присваиваем матрицы получение из буфера. Выставляем флаг new_massage.

if not buff.empty():
  message = buff.get()
  if len(message) == 1 and message[0] == 'end':
      break
  t_vec = message[0]
  r_vec_euler = message[1]
  new_message = True
  1. Если достигнута точка, то выставляем флаг на перерасчёт координат.

if drone.point_reached():
  p_r = True
  1. Если сработал флаг на перерасчёт координат и t_vec с t_vec_euler не пустые, то происходит расчёт координат:

if p_r and t_vec is not None and r_vec_euler is not None:
  x_camera = t_vec.item(0)
  y_camera = t_vec.item(1)
  z_camera = t_vec.item(2)
  yaw_camera = math.radians(r_vec_euler.item(1))
  c_yaw_prev = math.cos(command_yaw)
  s_yaw_prev = math.sin(command_yaw)
  command_yaw -= k_p_yaw * yaw_camera
  c_yaw_comm = math.cos(command_yaw)
  s_yaw_comm = math.sin(command_yaw)
  command_x += k_p_xy*(x_camera*c_yaw_prev-z_camera*s_yaw_prev+distance*s_yaw_comm)
  command_y += k_p_xy*(x_camera * s_yaw_prev + z_camera * c_yaw_prev - distance * c_yaw_comm)
  command_z -= k_p_z * y_camera
  new_point = True
  p_r = False
  1. Далее используем конструкцию if __name__ == „__main__“:, которая является точкой входа в программу, и сообщаем о запуске скрипта. Всё, что идёт до этого условия, выполнятся всегда: и при вызове в качестве модуля и при вызове, как исполняемый файл.

if __name__ == '__main__':
  print('start')
Подробное описание данной конструкции: https://docs.python.org/3/library/__main__.html
  1. Внутри конструкции try загружаем калибровочный файл yaml, заранее созданный в скрипте get_camera_samples.

try:
  # change if calibration_matrix.yaml file is located in not default location
  calibration_file = open(os.path.join(os.getcwd(), '..', "camera_calibration", "result", "calibration_matrix.yaml"))
  parsed_calibration_file = yaml.load(calibration_file, Loader=yaml.FullLoader)
  mtx = np.array(parsed_calibration_file.get("camera_matrix"))
  dist = np.array(parsed_calibration_file.get("dist_coeff"))
  1. Если калибровочного файла нет, то пишем о необходимости произвести калибровку и завершаем выполнение скрипта:

except FileNotFoundError:
  print('Сan not find calibration data, please run get_camera_samples.py script from camera calibration folder')
  sys.exit(0)
  1. Необходимо воспользоваться конструкцией manager, т.к. просто экземпляр класса нельзя передавать в буфере. Помимо этого, запускаем моторы и взлетаем.

BaseManager.register('Pioneer', Pioneer)
manager = BaseManager()
manager.start()
pioneer_mini = manager.Pioneer()
pioneer_mini.arm()
pioneer_mini.takeoff()
  1. Создаём буфер обмена данными и мульти процессы, исполнительными функциями которых являются ранее созданные image_proc и drone_control запускам их.

buffer = mp.Queue(maxsize=1)
pos_and_orient = mp.Process(target=image_proc, args=(buffer, pioneer_mini, mtx, dist))
drone_flight = mp.Process(target=drone_control, args=(buffer, pioneer_mini))
pos_and_orient.start()
drone_flight.start()
  1. Как только выполнение процессов завершено, то они прикрепляются обратно к основному потоку для их корректного закрытия:

pos_and_orient.join()
drone_flight.join()

pioneer_mini.land()

Вопросы для самостоятельного разбора.

1) Как можно улучшить поворот за тегом?
2) Дописать алгоритм так, чтобы можно было отслеживать маркер независимо от его ориентации.
3) Если два маркера, отслеживать один – ближайший.
4) Дописать алгоритм так, чтобы коптер отслеживал строго определенный маркер из словаря. (например (aruco.DICT_6X6_1000))
5) Дописать алгоритм так, чтобы можно отслеживать два маркера и производить конкретные действия для каждого маркера.
6) Дописать алгоритм, чтобы коптер совершал индикацию светодиодами в зависимости от маркера который он видит.
7) Дописать алгоритм, чтобы коптер производил посадку на маркер.