25 WarpTransform multithreaded video processing sample.
27 test_warp_transform_numpi_video_threaded.py {<video device number>|<video file name>}
29 Shows how python threading capabilities can be used
30 to organize parallel captured frame processing pipeline
31 for smoother playback.
36 f - switch between CPU and cloudFPGA version (pre-programming is required)
37 space - switch between multi and single threaded processing
41 from __future__
import print_function
45 video_common_lib=os.environ[
'cFpRootDir'] +
"HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
51 import multiprocessing
53 from multiprocessing.pool
import ThreadPool
54 from collections
import deque
57 manager = multiprocessing.Manager()
58 from threading
import Lock
60 from common
import clock, draw_str, StatValue
64 trieres_lib=os.environ[
'cFpRootDir'] +
"HOST/vision/warp_transform/languages/python/build"
65 sys.path.append(trieres_lib)
67 import _trieres_warp_transform_numpi
82 self.
_start_start = datetime.datetime.now()
86 self.
_end_end = datetime.datetime.now()
94 return (self.
_end_end - self.
_start_start).total_seconds()
113 config_file=os.environ[
'cFpRootDir'] +
"HOST/vision/warp_transform/languages/cplusplus/include/config.h"
115 with open(config_file)
as cfg:
117 if "#define FRAME_WIDTH" in line:
118 width =
int(line.split()[2])
119 elif "#define FRAME_HEIGHT" in line:
120 height =
int(line.split()[2])
122 print(
"Found in " + config_file +
": width = "+
str(width) +
", height = "+
str(height))
123 total_size = height * width
125 print(
"Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file +
". Aborting...")
132 cap = video.create_capture(fn)
136 lock = manager.Lock()
142 if (h>height)
and (w>width):
143 roi_x_pos =
int((w-width) /2)
144 roi_y_pos =
int((h-height)/2)
145 crop_img = img[
int(roi_y_pos):
int(roi_y_pos+height),
int(roi_x_pos):
int(roi_x_pos+width)]
148 print(
"WARNING: The input image of [", h ,
" x ", w ,
"] is not bigger to crop a ROI of [", height ,
" x ", width,
"]. Will just resize")
150 min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
152 crop_img = img[
int(h/2-min_size/2):
int(h/2+min_size/2),
int(w/2-min_size/2):
int(w/2+min_size/2)]
155 if ((crop_img.shape[0] != height)
or (crop_img.shape[1] != width)):
156 print(
"WARNING: The image was resized from [", crop_img.shape[0] ,
" x ", crop_img.shape[1] ,
"] to [", height ,
" x ", width,
"]")
157 resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
165 h_orig, w_orig = orig.shape[:2]
166 h_frame, w_frame = frame.shape[:2]
170 if (h_orig>h_frame)
and (w_orig>w_frame):
171 roi_x_pos =
int((w_orig-w_frame)/2)
172 roi_y_pos =
int((h_orig-h_frame)/2)
173 frame_backtorgb = cv.cvtColor(frame,cv.COLOR_GRAY2RGB)
174 patched_img[
int(roi_y_pos):
int(roi_y_pos+h_frame),
int(roi_x_pos):
int(roi_x_pos+w_frame),:] = frame_backtorgb
177 print(
"WARNING: The input image of [", h_orig ,
" x ", w_orig ,
"] is not bigger to embed a ROI of [", h_frame ,
" x ", w_frame,
"]. Will just resize")
178 print(
"after 1st if")
180 if ((patched_img.shape[0] != h_orig)
or (patched_img.shape[1] != w_orig)):
181 print(
"WARNING: The image was resized from [", patched_img.shape[0] ,
" x ", patched_img.shape[1] ,
"] to [", h_orig ,
" x ", w_orig,
"]")
182 resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
184 resized = patched_img
189 def process_frame(frame, t0, threaded_mode, accel_mode, fpga, fpgas):
192 frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
202 image = frame.flatten()
203 output_array = _trieres_warp_transform_numpi.warp_transform(image, total_size, fpga[0], fpga[1])
206 frame = np.reshape(output_array, (height, width))
208 print(
"Declare free the fpga: "+
str(fpga))
213 fpgas.appendleft(fpga)
217 frame = cv.medianBlur(frame, 9)
220 print(
"returning from process_frame")
227 fpgas = deque([[
"10.12.200.225" ,
"2718"],
228 [
"10.12.200.224" ,
"2719"]])
236 threadn = cv.getNumberOfCPUs()
237 pool = ThreadPool(processes = threadn)
240 latency = StatValue()
241 frame_interval = StatValue()
242 last_frame_time =
clock()
245 print(
"Before while len(pending)="+
str(len(pending)))
246 while len(pending) > 0
and pending[0].ready() :
247 print(
"After while len(pending)="+
str(len(pending)))
248 print(
"Before pending.popleft().get()")
249 res, t0 = pending.popleft().get()
250 print(
"After pending.popleft().get(): len(pending)="+
str(len(pending)))
254 latency.update(
clock() - t0)
255 draw_str(res, (20, 20),
"threaded : " +
str(threaded_mode))
256 draw_str(res, (20, 40),
"cloudFPA : " +
str(accel_mode))
257 draw_str(res, (20, 60),
"latency : %.1f ms" % (latency.value*1000))
258 draw_str(res, (20, 80),
"frame interval : %.1f ms" % (frame_interval.value*1000))
259 draw_str(res, (20, 100),
"FPS : %.1f" % (1.0/frame_interval.value))
267 if len(pending) < threadn:
268 _ret, frame = cap.read()
270 print(
"Reached EOF.")
271 print(
"Saved video: " + video_name)
276 frame_interval.update(t - last_frame_time)
282 fpga = fpgas.popleft()
283 print(
"Reserved the fpga:"+
str(fpga))
288 task = pool.apply_async(process_frame, (frame.copy(), t, threaded_mode, accel_mode, fpga, fpgas))
291 task =
DummyTask(process_frame(frame, t, threaded_mode, accel_mode, fpga, fpgas))
295 print(
"Waiting for a free fpga")
297 print(
"Waiting for a free thread")
304 threaded_mode =
not threaded_mode
306 accel_mode =
not accel_mode
315 print(
"[INFO] elasped time: {:.2f}".format(fps.elapsed()))
316 print(
"[INFO] approx. FPS: {:.2f}".format(fps.fps()))
318 if __name__ ==
'__main__':
def draw_str(dst, target, s)