25 MedianBlur multithreaded video processing sample.
27 test_median_blur_numpi_video_threaded.py {<video device number>|<video file name>}
29 Shows how python threading capabilities can be used
30 to organize parallel captured frame processing pipeline
31 for smoother playback.
36 f - switch between CPU and cloudFPGA version (pre-programming is required)
37 space - switch between multi and single threaded processing
41 from __future__
import print_function
45 video_common_lib=os.environ[
'cFpRootDir'] +
"HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
51 import multiprocessing
53 from multiprocessing.pool
import ThreadPool
54 from collections
import deque
57 manager = multiprocessing.Manager()
58 from threading
import Lock
60 from common
import clock, draw_str, StatValue
83 self.
_start_start = datetime.datetime.now()
87 self.
_end_end = datetime.datetime.now()
95 return (self.
_end_end - self.
_start_start).total_seconds()
114 config_file=os.environ[
'cFpRootDir'] +
"HOST/vision/median_blur/languages/cplusplus/include/config.h"
116 with open(config_file)
as cfg:
118 if "#define FRAME_WIDTH" in line:
119 width =
int(line.split()[2])
120 elif "#define FRAME_HEIGHT" in line:
121 height =
int(line.split()[2])
123 print(
"Found in " + config_file +
": width = "+
str(width) +
", height = "+
str(height))
124 total_size = height * width
126 print(
"Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file +
". Aborting...")
133 cap = video.create_capture(fn)
137 lock = manager.Lock()
143 if (h>height)
and (w>width):
144 roi_x_pos =
int((w-width) /2)
145 roi_y_pos =
int((h-height)/2)
146 crop_img = img[
int(roi_y_pos):
int(roi_y_pos+height),
int(roi_x_pos):
int(roi_x_pos+width)]
149 print(
"WARNING: The input image of [", h ,
" x ", w ,
"] is not bigger to crop a ROI of [", height ,
" x ", width,
"]. Will just resize")
151 min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
153 crop_img = img[
int(h/2-min_size/2):
int(h/2+min_size/2),
int(w/2-min_size/2):
int(w/2+min_size/2)]
156 if ((crop_img.shape[0] != height)
or (crop_img.shape[1] != width)):
157 print(
"WARNING: The image was resized from [", crop_img.shape[0] ,
" x ", crop_img.shape[1] ,
"] to [", height ,
" x ", width,
"]")
158 resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
166 h_orig, w_orig = orig.shape[:2]
167 h_frame, w_frame = frame.shape[:2]
171 if (h_orig>h_frame)
and (w_orig>w_frame):
172 roi_x_pos =
int((w_orig-w_frame)/2)
173 roi_y_pos =
int((h_orig-h_frame)/2)
174 frame_backtorgb = cv.cvtColor(np.float32(frame),cv.COLOR_GRAY2RGB)
175 patched_img[
int(roi_y_pos):
int(roi_y_pos+h_frame),
int(roi_x_pos):
int(roi_x_pos+w_frame),:] = frame_backtorgb
178 print(
"WARNING: The input image of [", h_orig ,
" x ", w_orig ,
"] is not bigger to embed a ROI of [", h_frame ,
" x ", w_frame,
"]. Will just resize")
179 print(
"after 1st if")
181 if ((patched_img.shape[0] != h_orig)
or (patched_img.shape[1] != w_orig)):
182 print(
"WARNING: The image was resized from [", patched_img.shape[0] ,
" x ", patched_img.shape[1] ,
"] to [", h_orig ,
" x ", w_orig,
"]")
183 resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
185 resized = patched_img
190 def process_frame(frame, t0, threaded_mode, accel_mode, fpga, fpgas):
193 frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
203 image = frame.flatten()
205 output_array = trieres.vision.median_blur(image, total_size, fpga[0],
int(fpga[1]))
208 frame = np.reshape(output_array, (height, width))
210 print(
"Declare free the fpga: "+
str(fpga))
215 fpgas.appendleft(fpga)
219 frame = cv.medianBlur(frame, 9)
222 print(
"returning from process_frame")
226 threaded_mode =
False
229 fpgas = deque([[
"10.12.200.73" ,
"2718"]])
238 threadn = cv.getNumberOfCPUs()
239 pool = ThreadPool(processes = threadn)
242 latency = StatValue()
243 frame_interval = StatValue()
244 last_frame_time =
clock()
247 print(
"Before while len(pending)="+
str(len(pending)))
248 while len(pending) > 0
and pending[0].ready() :
249 print(
"After while len(pending)="+
str(len(pending)))
250 print(
"Before pending.popleft().get()")
251 res, t0 = pending.popleft().get()
252 print(
"After pending.popleft().get(): len(pending)="+
str(len(pending)))
256 latency.update(
clock() - t0)
257 draw_str(res, (20, 20),
"threaded : " +
str(threaded_mode))
258 draw_str(res, (20, 40),
"cloudFPGA : " +
str(accel_mode))
259 draw_str(res, (20, 60),
"latency : %.1f ms" % (latency.value*1000))
260 draw_str(res, (20, 80),
"frame interval : %.1f ms" % (frame_interval.value*1000))
261 draw_str(res, (20, 100),
"FPS : %.1f" % (1.0/frame_interval.value))
265 video_name =
str(fn)+
"_out.avi"
266 video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc(
'M',
'J',
'P',
'G'), 30, (res.shape[1],res.shape[0]))
268 cv.imshow(
'threaded video', res)
269 if len(pending) < threadn:
270 _ret, frame = cap.read()
272 print(
"Reached EOF.")
273 print(
"Saved video: " + video_name)
278 frame_interval.update(t - last_frame_time)
284 fpga = fpgas.popleft()
285 print(
"Reserved the fpga:"+
str(fpga))
290 task = pool.apply_async(process_frame, (frame.copy(), t, threaded_mode, accel_mode, fpga, fpgas))
293 task =
DummyTask(process_frame(frame, t, threaded_mode, accel_mode, fpga, fpgas))
297 print(
"Waiting for a free fpga")
299 print(
"Waiting for a free thread")
306 threaded_mode =
not threaded_mode
308 accel_mode =
not accel_mode
317 print(
"[INFO] elasped time: {:.2f}".format(fps.elapsed()))
318 print(
"[INFO] approx. FPS: {:.2f}".format(fps.fps()))
320 if __name__ ==
'__main__':
323 cv.destroyAllWindows()
def draw_str(dst, target, s)