Hardware & Tooling
PYNQ-DPU
This repo contains info on how to set-up and use PYNQ-DPU.
Please make sure you have installed PYNQ-DPU and have added the xmodel from Vitis AI to the correct direfctory
1. Prepare the overlay
We will download the overlay onto the board.
from pynq_dpu import DpuOverlay
overlay = DpuOverlay("dpu.bit")
2. Utility functions
In this section, we will prepare a few functions for later use.
import os
import time
import numpy as np
import cv2
import random
import colorsys
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
%matplotlib inline
from pynq.lib.video import *
from tqdm import tqdm
The load_model() method will automatically prepare the graph
which is used by VART.
overlay.load_model("yolov3_new_tiny_224.xmodel")
Let's first define a few useful preprocessing functions.
anchor_list = [10,14, 23,27, 37,58, 141,142, 135,169, 344,319] #141,142 originally were 81,82
anchor_float = [float(x) for x in anchor_list]
anchors = np.array(anchor_float).reshape(-1, 2)
'''Get model classification information'''
def get_class(classes_path):
with open(classes_path) as f:
class_names = f.readlines()
class_names = [c.strip() for c in class_names]
return class_names
classes_path = "img/voc_classes.txt"
class_names = get_class(classes_path)
print(class_names)
num_classes = len(class_names)
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x:
(int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)),
colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)
'''resize image with unchanged aspect ratio using padding'''
def letterbox_image(image, size):
ih, iw, _ = image.shape
w, h = size
scale = min(w/iw, h/ih)
#print(scale)
nw = int(iw*scale)
nh = int(ih*scale)
#print(nw)
#print(nh)
image = cv2.resize(image, (nw,nh), interpolation=cv2.INTER_LINEAR)
new_image = np.ones((h,w,3), np.uint8) * 128
h_start = (h-nh)//2
w_start = (w-nw)//2
new_image[h_start:h_start+nh, w_start:w_start+nw, :] = image
return new_image
'''image preprocessing'''
def pre_process(image, model_image_size):
image = image[...,::-1]
image_h, image_w, _ = image.shape
if model_image_size != (None, None):
assert model_image_size[0]%32 == 0, 'Multiples of 32 required'
assert model_image_size[1]%32 == 0, 'Multiples of 32 required'
boxed_image = letterbox_image(image, tuple(reversed(model_image_size)))
else:
new_image_size = (image_w - (image_w % 32), image_h - (image_h % 32))
boxed_image = letterbox_image(image, new_image_size)
image_data = np.array(boxed_image, dtype='float32')
image_data /= 255.
image_data = np.expand_dims(image_data, 0)
return image_data
We will also define a few functions to post-process the output after running a DPU task.
def _get_feats(feats, anchors, num_classes, input_shape):
num_anchors = len(anchors)
anchors_tensor = np.reshape(np.array(anchors, dtype=np.float32), [1, 1, 1, num_anchors, 2])
grid_size = np.shape(feats)[1:3]
nu = num_classes+5
predictions = np.reshape(feats, [-1, grid_size[0], grid_size[1], num_anchors, nu])
grid_y = np.tile(np.reshape(np.arange(grid_size[0]), [-1, 1, 1, 1]), [1, grid_size[1], 1, 1])
grid_x = np.tile(np.reshape(np.arange(grid_size[1]), [1, -1, 1, 1]), [grid_size[0], 1, 1, 1])
grid = np.concatenate([grid_x, grid_y], axis = -1)
grid = np.array(grid, dtype=np.float32)
box_xy = (1/(1+np.exp(-predictions[..., :2])) + grid) / np.array(grid_size[::-1], dtype=np.float32)
box_wh = np.exp(predictions[..., 2:4]) * anchors_tensor / np.array(input_shape[::-1], dtype=np.float32)
box_confidence = 1/(1+np.exp(-predictions[..., 4:5]))
box_class_probs = 1/(1+np.exp(-predictions[..., 5:]))
return box_xy, box_wh, box_confidence, box_class_probs
def correct_boxes(box_xy, box_wh, input_shape, image_shape):
box_yx = box_xy[..., ::-1]
box_hw = box_wh[..., ::-1]
input_shape = np.array(input_shape, dtype = np.float32)
image_shape = np.array(image_shape, dtype = np.float32)
new_shape = np.around(image_shape * np.min(input_shape / image_shape))
offset = (input_shape - new_shape) / 2. / input_shape
scale = input_shape / new_shape
box_yx = (box_yx - offset) * scale
box_hw *= scale
box_mins = box_yx - (box_hw / 2.)
box_maxes = box_yx + (box_hw / 2.)
boxes = np.concatenate([
box_mins[..., 0:1],
box_mins[..., 1:2],
box_maxes[..., 0:1],
box_maxes[..., 1:2]
], axis = -1)
boxes *= np.concatenate([image_shape, image_shape], axis = -1)
return boxes
def boxes_and_scores(feats, anchors, classes_num, input_shape, image_shape):
box_xy, box_wh, box_confidence, box_class_probs = _get_feats(feats, anchors, classes_num, input_shape)
boxes = correct_boxes(box_xy, box_wh, input_shape, image_shape)
print()
boxes = np.reshape(boxes, [-1, 4])
box_scores = box_confidence * box_class_probs
box_scores = np.reshape(box_scores, [-1, classes_num])
return boxes, box_scores
'''Draw detection frame'''
def draw_bbox(image, bboxes, classes, length):
"""
bboxes: [x_min, y_min, x_max, y_max, probability, cls_id] format coordinates.
"""
num_classes = len(classes)
image_h, image_w, _ = image.shape
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)
if length:
for i, bbox in enumerate(bboxes):
coor = np.array(bbox[:4], dtype=np.int32)
fontScale = 0.5
score = bbox[4]
class_ind = int(bbox[5])
bbox_color = colors[class_ind]
bbox_thick = int(0.6 * (image_h + image_w) / 600)
c1, c2 = (coor[0], coor[1]), (coor[2], coor[3])
cv2.rectangle(image, c1, c2, bbox_color, bbox_thick)
else:
coor = bboxes.astype(np.int32)
fontScale = 0.5
score = coor[4]
class_ind = int(coor[5])
bbox_color = colors[class_ind]
bbox_thick = int(0.6 * (image_h + image_w) / 600)
c1, c2 = (coor[0], coor[1]), (coor[2], coor[3])
cv2.rectangle(image, c1, c2, bbox_color, bbox_thick)
return image
def nms_boxes(boxes, scores):
"""Suppress non-maximal boxes.
# Arguments
boxes: ndarray, boxes of objects.
scores: ndarray, scores of objects.
# Returns
keep: ndarray, index of effective boxes.
"""
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
areas = (x2-x1+1)*(y2-y1+1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w1 = np.maximum(0.0, xx2 - xx1 + 1)
h1 = np.maximum(0.0, yy2 - yy1 + 1)
inter = w1 * h1
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= 0.55)[0] # threshold
order = order[inds + 1]
return keep
def draw_boxes(image, boxes, scores, classes):
_, ax = plt.subplots(1)
ax.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
image_h, image_w, _ = image.shape
for i, bbox in enumerate(boxes):
[top, left, bottom, right] = bbox
width, height = right - left, bottom - top
center_x, center_y = left + width*0.5, top + height*0.5
score, class_index = scores[i], classes[i]
label = '{}: {:.4f}'.format(class_names[class_index], score)
color = tuple([color/255 for color in colors[class_index]])
ax.add_patch(Rectangle((left, top), width, height,
edgecolor=color, facecolor='none'))
ax.annotate(label, (center_x, center_y), color=color, weight='bold',
fontsize=12, ha='center', va='center')
return ax
def evaluate(yolo_outputs, image_shape, class_names, anchors):
score_thresh = 0.2
anchor_mask = [ [1, 2, 3],[3,4,5]]
boxes = []
box_scores = []
input_shape = np.shape(yolo_outputs[0])[1 : 3]
input_shape = np.array(input_shape)*32
for i in range(len(yolo_outputs)):
_boxes, _box_scores = boxes_and_scores(
yolo_outputs[i], anchors[anchor_mask[i]], len(class_names),
input_shape, image_shape)
boxes.append(_boxes)
box_scores.append(_box_scores)
boxes = np.concatenate(boxes, axis = 0)
box_scores = np.concatenate(box_scores, axis = 0)
mask = box_scores >= score_thresh
boxes_ = []
scores_ = []
classes_ = []
for c in range(len(class_names)):
class_boxes_np = boxes[mask[:, c]]
class_box_scores_np = box_scores[:, c]
class_box_scores_np = class_box_scores_np[mask[:, c]]
nms_index_np = nms_boxes(class_boxes_np, class_box_scores_np)
class_boxes_np = class_boxes_np[nms_index_np]
class_box_scores_np = class_box_scores_np[nms_index_np]
classes_np = np.ones_like(class_box_scores_np, dtype = np.int32) * c
boxes_.append(class_boxes_np)
scores_.append(class_box_scores_np)
classes_.append(classes_np)
boxes_ = np.concatenate(boxes_, axis = 0)
scores_ = np.concatenate(scores_, axis = 0)
classes_ = np.concatenate(classes_, axis = 0)
return boxes_, scores_, classes_
Keep in mind that our original images are 640x480 so we need to preprocess them later to make sure it fits our model.
image_folder = 'new'
original_images = sorted([i for i in os.listdir(image_folder) if i.endswith("jpg")])
total_images = len(original_images)
3. Use VART
Now we should be able to use VART to do image classification.
We will use the tools from Vitis AI to interface with the DPU
dpu = overlay.runner
inputTensors = dpu.get_input_tensors()
outputTensors = dpu.get_output_tensors()
shapeIn = tuple(inputTensors[0].dims)
print(shapeIn) #double check input image size
shapeOut0 = (tuple(outputTensors[0].dims))
shapeOut1 = (tuple(outputTensors[1].dims))
outputSize0 = int(outputTensors[0].get_data_size() / shapeIn[0])
outputSize1 = int(outputTensors[1].get_data_size() / shapeIn[0])
We can define a few buffers to store input and output data. They will be reused during multiple runs.
input_data = [np.empty(shapeIn, dtype=np.float32, order="C")]
output_data = [np.empty(shapeOut0, dtype=np.float32, order="C"),
np.empty(shapeOut1, dtype=np.float32, order="C")]
image = input_data[0]
Remember that we have a list of original_images.
We can now define a new function run() which takes the image index as
the input, then decode and post-process the output as the detection result.
With the argument display set to True, the original image as well as the
detected objects and their labels can be rendered.
It is obvious that the range of image_index should be [0, total_images-1].
def run(image_index, display=False):
# Read input image
input_image = cv2.imread(os.path.join(image_folder, original_images[image_index]))
# Pre-processing
image_size = input_image.shape[:2]
image_data = np.array(pre_process(input_image, (224, 224)), dtype=np.float32)
# Fetch data to DPU and trigger it
image[0,...] = image_data.reshape(shapeIn[1:])
job_id = dpu.execute_async(input_data, output_data)
dpu.wait(job_id)
# Retrieve output data
conv_out0 = np.reshape(output_data[0], shapeOut0)
conv_out1 = np.reshape(output_data[1], shapeOut1)
#conv_out2 = np.reshape(output_data[2], shapeOut2)
yolo_outputs = [conv_out0, conv_out1 ]
# Decode output from YOLOv3
boxes, scores, classes = evaluate(yolo_outputs, image_size, class_names, anchors)
if display:
_ = draw_boxes(input_image, boxes, scores, classes)
print("Number of detected objects: {}".format(len(boxes)))
Let's run it for 1 image and print out the detected label.
run(0, display=True)
We can also run it for multiple images as shown below. In this example we have only used 1 thread; in principle, users should be able to boost the performance by employing more threads.
time1 = time.time()
[run(i) for i in range(total_images)]
time2 = time.time()
fps = total_images/(time2-time1)
print("Performance: {} FPS".format(fps))
4. Real-Time YOLO
Now that we can sucessfully run the model by loading images let's prepare it for real time processing
To do so first we have to set up the displayport
#set up display
displayport = DisplayPort()
width = 640
height =480
displayport.configure(VideoMode(width, height, 24), PIXEL_RGB)
Then we have to set up the input camera using opencv
capture = cv2.VideoCapture(0)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
Finally we have to modify the run function such that it can handle frames instead of loading images
def run(image_array, display=False):
# Read input image
input_image = image_array
image_size = input_image.shape[:2]
# Pre-processing
image_data = np.array(pre_process(input_image, (224,224)), dtype=np.float32)
# Fetch data to DPU and trigger it
image[0,...] = image_data.reshape(shapeIn[1:])
job_id = dpu.execute_async(input_data, output_data)
dpu.wait(job_id)
# Retrieve output data
conv_out0 = np.reshape(output_data[0], shapeOut0)
conv_out1 = np.reshape(output_data[1], shapeOut1)
yolo_outputs = [conv_out0, conv_out1]
# Decode output from YOLOv3
boxes, scores, classes = evaluate(yolo_outputs, image_size, class_names, anchors)
if display:
bb = draw_boxes(input_image, boxes, scores, classes)
print("Number of detected objects: {}".format(len(boxes)))
return boxes, scores, classes
To check if it works you can try the below code
ret_val, img=capture.read()
input_image = img
image_size = input_image.shape[:2]
print(image_size)
run(img, display= True)
You should be able to see the camera image on Jupter notebook.
Now we have to make a function to post process the frames such that the bounding boxes can be displayed on the monitor.
def post_process(img):
boxes, scores, classes= run(img, display=True)
output_image = np.zeros((height, width, 3))
original_image = img
length = len(boxes)-1
identity = ["PCB", "Copper"]
if length>0:
box_list = np.array(boxes)
box = box_list.tolist()
i = 0
while(i<=length):
probability = scores[i]
cls_id = classes[i]
box[i].append(probability)
box[i].append(cls_id)
i+=1
bboxes = np.array(box)
new_img = draw_bbox(original_image, bboxes, identity, length)
output_image[:new_img.shape[0],:new_img.shape[1],:] = new_img
processed_frame = output_image
return processed_frame
elif length ==0:
new_boxes=np.append(boxes[0],scores[0])
bboxes=np.append(new_boxes,classes[0])
new_img = draw_bbox(original_image, bboxes, identity, length)
output_image[:new_img.shape[0],:new_img.shape[1],:] = new_img
processed_frame = output_image
return processed_frame
else:
new_img = original_image
output_image[:new_img.shape[0],:new_img.shape[1],:] = new_img
processed_frame = output_image
return processed_frame
Then finally to run the real-time yolov3-tiny
for _ in tqdm(range(1000000000000)):
ret_val, img=capture.read()
post_process(img)
test_frame = post_process(img)
frame = displayport.newframe()
frame[:,:,:]= test_frame
frame[:,:,:]= img
displayport.writeframe(frame)
