25 Harris multithreaded video processing sample.
27 test_harris_numpi_video_threaded.py {<video device number>|<video file name>}
29 Shows how python threading capabilities can be used
30 to organize parallel captured frame processing pipeline
31 for smoother playback.
36 f - switch between CPU and cloudFPGA version (pre-programming is required)
37 space - switch between multi and single threaded processing
41 from __future__
import print_function
45 video_common_lib=os.environ[
'cFpRootDir'] +
"HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
51 from multiprocessing.pool
import ThreadPool
52 from collections
import deque
54 from common
import clock, draw_str, StatValue
57 trieres_lib=os.environ[
'cFpRootDir'] +
"HOST/vision/harris/languages/python/build"
58 sys.path.append(trieres_lib)
60 import _trieres_harris_numpi
64 total_size = height * width
77 self.
_start_start = datetime.datetime.now()
81 self.
_end_end = datetime.datetime.now()
89 return (self.
_end_end - self.
_start_start).total_seconds()
112 cap = video.create_capture(fn)
115 video_name =
str(fn)+
"_out.avi"
116 video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc(
'M',
'J',
'P',
'G'), 10, (width,height))
118 fpgas = deque([ [
"10.12.200.3" ,
"2718"],
119 [
"10.12.200.165" ,
"2719"]])
122 def process_frame(frame, t0, accel_mode, fpga):
124 print(
"Will execute on fpga with ip:port: "+fpga[0]+
":"+fpga[1])
128 frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
130 if ((frame.shape[0] != height)
or (frame.shape[1] != width)):
131 print(
"WARNING: The image was resized from [", frame.shape[0] ,
" x ", frame.shape[1] ,
"] to [", height ,
" x ", width,
"]")
132 dim = (width, height)
133 frame = cv.resize(frame, dim, interpolation = cv.INTER_LINEAR)
135 image = frame.flatten()
136 output_array = _trieres_harris_numpi.harris(image, total_size, fpga[0], fpga[1])
138 frame = np.reshape(output_array, (height, width))
139 print(
"Declare free the fpga: "+
str(fpga))
140 fpgas.appendleft(fpga)
142 frame = cv.medianBlur(frame, 19)
143 frame = cv.medianBlur(frame, 19)
144 frame = cv.medianBlur(frame, 19)
148 pool = ThreadPool(processes = threadn)
153 latency = StatValue()
154 frame_interval = StatValue()
155 last_frame_time =
clock()
157 while len(pending) > 0
and pending[0].ready()
and len(fpgas) > 0:
158 res, t0 = pending.popleft().get()
159 latency.update(
clock() - t0)
161 draw_str(res, (20, 20),
"threaded : " +
str(threaded_mode))
162 draw_str(res, (20, 40),
"cloudFPA : " +
str(accel_mode))
163 draw_str(res, (20, 60),
"latency : %.1f ms" % (latency.value*1000))
164 draw_str(res, (20, 80),
"frame interval : %.1f ms" % (frame_interval.value*1000))
165 draw_str(res, (20, 100),
"FPS : %.1f" % (1.0/frame_interval.value))
167 if len(pending) < threadn
and len(fpgas) != 0:
168 _ret, frame = cap.read()
170 print(
"Reached EOF.")
171 print(
"Saved video: " + video_name)
175 video_out.write(frame)
177 frame_interval.update(t - last_frame_time)
182 fpga = fpgas.popleft()
186 task = pool.apply_async(process_frame, (frame.copy(), t, accel_mode, fpga))
188 task =
DummyTask(process_frame(frame, t, accel_mode, fpga))
192 print(
"Waiting for a free fpga")
196 threaded_mode =
not threaded_mode
198 accel_mode =
not accel_mode
207 print(
"[INFO] elasped time: {:.2f}".format(fps.elapsed()))
208 print(
"[INFO] approx. FPS: {:.2f}".format(fps.fps()))
210 if __name__ ==
'__main__':
def draw_str(dst, target, s)