26 test_median_blur_numpi_video_threaded.py <video file name>
28 Shows how python ray capabilities can be used
29 to organize parallel frame processing pipeline
32 On head node the ray server may be started with the command:
33 taskset -c 1 ray start --head --port=6379 --num-cpus=7 --resources='{"cloudFPGA": 7}'
35 Then client nodes can be connected with the command
36 ray start --address='10.12.0.10:6379' --redis-password='5241590000000000' --num-cpus=7 --resources='{"cloudFPGA": 7}'
37 or directly using the ray init method in this script.
42 from ray.util.queue
import Queue
54 debug_level = logging.INFO
56 config_file=os.environ[
'cFpRootDir'] +
"HOST/vision/median_blur/languages/cplusplus/include/config.h"
57 logging.basicConfig(level=debug_level)
59 with open(config_file)
as cfg:
61 if "#define FRAME_WIDTH" in line:
62 width =
int(line.split()[2])
63 elif "#define FRAME_HEIGHT" in line:
64 height =
int(line.split()[2])
66 logging.info(
"Found image dimensions in " + config_file +
": width = "+
str(width) +
", height = "+
str(height))
67 total_size = height * width
69 logging.error(
"Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file +
". Aborting...")
72 def crop_square_roi(img, size, interpolation=cv.INTER_AREA, debug_level=debug_level):
73 logging.basicConfig(level=debug_level)
77 if (h>height)
and (w>width):
78 roi_x_pos =
int((w-width) /2)
79 roi_y_pos =
int((h-height)/2)
80 crop_img = img[
int(roi_y_pos):
int(roi_y_pos+height),
int(roi_x_pos):
int(roi_x_pos+width)]
83 logging.warning(f
"The input image of [{h}x{w}] is not bigger to embed a ROI of [{height}x{width}]. Will just resize")
85 min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
87 crop_img = img[
int(h/2-min_size/2):
int(h/2+min_size/2),
int(w/2-min_size/2):
int(w/2+min_size/2)]
90 if ((crop_img.shape[0] != height)
or (crop_img.shape[1] != width)):
91 logging.warning(f
"The image was resized from [{crop_img.shape[0]} x {crop_img.shape[1]}] to [{height}x{width}]")
92 resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
97 def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA, debug_level=debug_level):
98 logging.basicConfig(level=debug_level)
100 h_orig, w_orig = orig.shape[:2]
101 h_frame, w_frame = frame.shape[:2]
103 patched_img = orig.copy()
105 if (h_orig>h_frame)
and (w_orig>w_frame):
106 roi_x_pos =
int((w_orig-w_frame)/2)
107 roi_y_pos =
int((h_orig-h_frame)/2)
108 frame_backtorgb = cv.cvtColor(np.float32(frame),cv.COLOR_GRAY2RGB)
109 patched_img[
int(roi_y_pos):
int(roi_y_pos+h_frame),
int(roi_x_pos):
int(roi_x_pos+w_frame),:] = frame_backtorgb
112 logging.warning(f
"The input image of [{h_orig}x{w_orig}] is not bigger to embed a ROI of [{h_frame}x{w_frame}]. Will just resize")
114 if ((patched_img.shape[0] != h_orig)
or (patched_img.shape[1] != w_orig)):
115 logging.warning(f
"The image was resized from [{patched_img.shape[0]} x {patched_img.shape[1]}] to [{h_orig}x{w_orig}]")
116 resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
118 resized = patched_img
123 ray.init(address=
'ray://10.12.0.10:10001')
125 print(
'''This cluster consists of
127 {} CPU resources in total
128 {} cloudFPGA resources in total
129 '''.format(len(ray.nodes()), ray.cluster_resources()[
'CPU'], ray.cluster_resources()[
'cloudFPGA']))
132 fpgas_queue = Queue(maxsize=100)
134 @ray.remote(resources={'cloudFPGA': 1})
135 def consumer(accel_mode, fpgas_queue, frame, debug_level=debug_level):
136 logging.basicConfig(level=debug_level)
139 frame_ret = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
143 next_item = fpgas_queue.get(block=
True, timeout=100)
144 logging.debug(f
"will work on {next_item} and then put in back in the fpgas_queue")
146 image = frame_ret.flatten()
147 output_array = trieres.vision.median_blur(image, total_size, next_item[0],
int(next_item[1]), debug_level=debug_level)
148 frame_ret = np.reshape(output_array, (height, width))
150 fpgas_queue.put(next_item)
151 logging.debug(f
"finished working on {next_item} Now it is back in the fpgas_queue")
153 frame_ret = cv.medianBlur(frame_ret, 9)
155 frame_ret =
patch_sqaure_roi(orig, frame_ret, cv.INTER_AREA, debug_level=debug_level)
157 frame_ret = cv.cvtColor(np.uint8(frame_ret),cv.COLOR_GRAY2RGB)
166 tic_capture = time.perf_counter()
168 cap = cv.VideoCapture(fn)
173 if (cap.isOpened()==
False):
174 print(
"Error opening video stream or file")
178 while(cap.isOpened()):
180 ret, frame = cap.read()
189 toc_capture = time.perf_counter()
191 tic_consumers = time.perf_counter()
192 consumers = [consumer.remote(accel_mode, fpgas_queue, frames[i], debug_level=debug_level)
for i
in range(len(frames))]
221 [fpgas_queue.put(j)
for j
in ([ [
"10.12.200.17" ,
"2718"],
222 [
"10.12.200.20" ,
"2718"],
223 [
"10.12.200.69" ,
"2718"],
224 [
"10.12.200.208",
"2718"],
225 [
"10.12.200.179",
"2718"],
226 [
"10.12.200.223",
"2718"],
227 [
"10.12.200.182",
"2718"],
228 [
"10.12.200.232",
"2718"],
229 [
"10.12.200.59" ,
"2718"], ])]
231 toc_consumers = time.perf_counter()
233 tic_exec = time.perf_counter()
234 results = ray.get(consumers)
235 toc_exec = time.perf_counter()
236 logging.info(f
"Tasks executed")
238 tic_save = time.perf_counter()
239 video_name =
str(fn)+
"_out.avi"
241 video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc(
'M',
'J',
'P',
'G'), 30, (results[0].shape[1],results[0].shape[0]))
242 for t
in range(len(results)):
243 video_out.write(results[t])
245 logging.info(
"Saved video: " + video_name)
246 toc_save = time.perf_counter()
248 logging.info(f
"Tasks executed : {toc_exec - tic_exec:0.4f} seconds")
249 logging.info(f
"Consumers time : {toc_consumers - tic_consumers:0.4f} seconds")
250 logging.info(f
"Loading frames : {toc_capture - tic_capture:0.4f} seconds")
251 logging.info(f
"Saving video : {toc_save - tic_save:0.4f} seconds")