cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
test_median_blur_numpi_video_threaded.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # *****************************************************************************
4 # * cloudFPGA
5 # * Copyright 2016 -- 2022 IBM Corporation
6 # * Licensed under the Apache License, Version 2.0 (the "License");
7 # * you may not use this file except in compliance with the License.
8 # * You may obtain a copy of the License at
9 # *
10 # * http://www.apache.org/licenses/LICENSE-2.0
11 # *
12 # * Unless required by applicable law or agreed to in writing, software
13 # * distributed under the License is distributed on an "AS IS" BASIS,
14 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # * See the License for the specific language governing permissions and
16 # * limitations under the License.
17 # *----------------------------------------------------------------------------
18 
19 
23 
24 '''
25 MedianBlur multithreaded video processing sample.
26 Usage:
27  test_median_blur_numpi_video_threaded.py {<video device number>|<video file name>}
28 
29  Shows how python threading capabilities can be used
30  to organize parallel captured frame processing pipeline
31  for smoother playback.
32 
33 Keyboard shortcuts:
34 
35  ESC - exit
36  f - switch between CPU and cloudFPGA version (pre-programming is required)
37  space - switch between multi and single threaded processing
38 '''
39 
40 # Python 2/3 compatibility
41 from __future__ import print_function
42 
43 import sys
44 import os
45 video_common_lib=os.environ['cFpRootDir'] + "HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
47 
48 import numpy as np
49 import cv2 as cv
50 
51 import multiprocessing
52 
53 from multiprocessing.pool import ThreadPool
54 from collections import deque
55 
56 # Manager to create shared object.
57 manager = multiprocessing.Manager()
58 from threading import Lock
59 
60 from common import clock, draw_str, StatValue
61 import video
62 import time
63 
64 #trieres_lib=os.environ['cFpRootDir'] + "HOST/vision/median_blur/languages/python/build"
65 #sys.path.append(trieres_lib)
66 #import trieres
67 
68 from trieres import *
69 
70 ROI = True
71 
72 # import the necessary packages
73 import datetime
74 class FPS:
75  def __init__(self):
76  # store the start time, end time, and total number of frames
77  # that were examined between the start and end intervals
78  self._start_start = None
79  self._end_end = None
80  self._numFrames_numFrames = 0
81  def start(self):
82  # start the timer
83  self._start_start = datetime.datetime.now()
84  return self
85  def stop(self):
86  # stop the timer
87  self._end_end = datetime.datetime.now()
88  def update(self):
89  # increment the total number of frames examined during the
90  # start and end intervals
91  self._numFrames_numFrames += 1
92  def elapsed(self):
93  # return the total number of seconds between the start and
94  # end interval
95  return (self._end_end - self._start_start).total_seconds()
96  def fps(self):
97  # compute the (approximate) frames per second
98  return self._numFrames_numFrames / self.elapsedelapsed()
99 
100 
101 
102 
103 class DummyTask:
104  def __init__(self, data):
105  self.datadata = data
106  def ready(self):
107  return True
108  def get(self):
109  return self.datadata
110 
111 def main():
112  import sys
113 
114  config_file=os.environ['cFpRootDir'] + "HOST/vision/median_blur/languages/cplusplus/include/config.h"
115 
116  with open(config_file) as cfg:
117  for line in cfg:
118  if "#define FRAME_WIDTH" in line:
119  width = int(line.split()[2])
120  elif "#define FRAME_HEIGHT" in line:
121  height = int(line.split()[2])
122  try:
123  print("Found in " + config_file + ": width = "+str(width) + ", height = "+str(height))
124  total_size = height * width
125  except:
126  print("Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file + ". Aborting...")
127  exit(0)
128 
129  try:
130  fn = sys.argv[1]
131  except:
132  fn = 0
133  cap = video.create_capture(fn)
134  fps = FPS().start()
135 
136  # Create a lock.
137  lock = manager.Lock()
138  #lock = Lock()
139 
140  def crop_square_roi(img, size, interpolation=cv.INTER_AREA):
141  h, w = img.shape[:2]
142  if ROI:
143  if (h>height) and (w>width):
144  roi_x_pos = int((w-width) /2)
145  roi_y_pos = int((h-height)/2)
146  crop_img = img[int(roi_y_pos):int(roi_y_pos+height), int(roi_x_pos):int(roi_x_pos+width)]
147  else:
148  crop_img = img
149  print("WARNING: The input image of [", h , " x ", w , "] is not bigger to crop a ROI of [", height , " x ", width, "]. Will just resize")
150  else:
151  min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
152  # Centralize and crop
153  crop_img = img[int(h/2-min_size/2):int(h/2+min_size/2), int(w/2-min_size/2):int(w/2+min_size/2)]
154 
155  # Adjusting the image file if needed
156  if ((crop_img.shape[0] != height) or (crop_img.shape[1] != width)):
157  print("WARNING: The image was resized from [", crop_img.shape[0] , " x ", crop_img.shape[1] , "] to [", height , " x ", width, "]")
158  resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
159  else:
160  resized = crop_img
161  return resized
162 
163 
164 
165  def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA):
166  h_orig, w_orig = orig.shape[:2]
167  h_frame, w_frame = frame.shape[:2]
168 
169  patched_img = orig
170 
171  if (h_orig>h_frame) and (w_orig>w_frame):
172  roi_x_pos = int((w_orig-w_frame)/2)
173  roi_y_pos = int((h_orig-h_frame)/2)
174  frame_backtorgb = cv.cvtColor(np.float32(frame),cv.COLOR_GRAY2RGB)
175  patched_img[int(roi_y_pos):int(roi_y_pos+h_frame), int(roi_x_pos):int(roi_x_pos+w_frame),:] = frame_backtorgb
176  else:
177  patched_img = frame
178  print("WARNING: The input image of [", h_orig , " x ", w_orig , "] is not bigger to embed a ROI of [", h_frame , " x ", w_frame, "]. Will just resize")
179  print("after 1st if")
180  # Adjusting the image file if needed
181  if ((patched_img.shape[0] != h_orig) or (patched_img.shape[1] != w_orig)):
182  print("WARNING: The image was resized from [", patched_img.shape[0] , " x ", patched_img.shape[1] , "] to [", h_orig , " x ", w_orig, "]")
183  resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
184  else:
185  resized = patched_img
186  return resized
187 
188 
189 
190  def process_frame(frame, t0, threaded_mode, accel_mode, fpga, fpgas):
191  # Converting to grayscale
192  orig = frame
193  frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
194 
195  # Adjusting the image file if needed
196 
197  frame = crop_square_roi(frame, width, interpolation = cv.INTER_AREA)
198 
199  if accel_mode:
200  #print("Will execute on fpga with ip:port: "+fpga[0]+":"+fpga[1])
201  # some intensive computation...
202  # Flattening the image from 2D to 1D
203  image = frame.flatten()
204  #output_array = _trieres_median_blur_numpi.median_blur(image, total_size, fpga[0], fpga[1])
205  output_array = trieres.vision.median_blur(image, total_size, fpga[0], int(fpga[1]))
206  # Convert 1D array to a 2D numpy array
207  #time.sleep(1)
208  frame = np.reshape(output_array, (height, width))
209  print(type(frame))
210  print("Declare free the fpga: "+str(fpga))
211  lock.acquire()
212  if threaded_mode:
213  fpgas.append(fpga)
214  else:
215  fpgas.appendleft(fpga)
216  lock.release()
217  else:
218  #time.sleep(10)
219  frame = cv.medianBlur(frame, 9)
220  if ROI:
221  frame = patch_sqaure_roi(orig, frame, cv.INTER_AREA)
222  print("returning from process_frame")
223  return frame, t0
224 
225 
226  threaded_mode = False
227  accel_mode = False
228 
229  fpgas = deque([["10.12.200.73" , "2718"]]) #,
230 # ["10.12.200.224" , "2719"]])
231 # ["10.12.200.11" , "2720"],
232 # ["10.12.200.19" , "2721"],
233 # ["10.12.200.29" , "2722"]])
234 
235  if accel_mode:
236  threadn = len(fpgas)
237  else:
238  threadn = cv.getNumberOfCPUs()
239  pool = ThreadPool(processes = threadn)
240  pending = deque()
241 
242  latency = StatValue()
243  frame_interval = StatValue()
244  last_frame_time = clock()
245  while True:
246  fpga = 0
247  print("Before while len(pending)="+str(len(pending)))
248  while len(pending) > 0 and pending[0].ready() :
249  print("After while len(pending)="+str(len(pending)))
250  print("Before pending.popleft().get()")
251  res, t0 = pending.popleft().get()
252  print("After pending.popleft().get(): len(pending)="+str(len(pending)))
253  print(type(fpga))
254  print(str(fpga))
255  #exit(0)
256  latency.update(clock() - t0)
257  draw_str(res, (20, 20), "threaded : " + str(threaded_mode))
258  draw_str(res, (20, 40), "cloudFPGA : " + str(accel_mode))
259  draw_str(res, (20, 60), "latency : %.1f ms" % (latency.value*1000))
260  draw_str(res, (20, 80), "frame interval : %.1f ms" % (frame_interval.value*1000))
261  draw_str(res, (20, 100), "FPS : %.1f" % (1.0/frame_interval.value))
262  try:
263  video_out.write(res)
264  except:
265  video_name = str(fn)+"_out.avi"
266  video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc('M','J','P','G'), 30, (res.shape[1],res.shape[0]))
267  #print("video_out Size is:"+str(res.shape[1])+","+str(res.shape[0]))
268  cv.imshow('threaded video', res)
269  if len(pending) < threadn: # and len(fpgas) != 0:
270  _ret, frame = cap.read()
271  if _ret is False:
272  print("Reached EOF.")
273  print("Saved video: " + video_name)
274  #video_out.release()
275  break
276  #print("frame Size is:"+str(frame.shape[1])+","+str(frame.shape[0]))
277  t = clock()
278  frame_interval.update(t - last_frame_time)
279  last_frame_time = t
280  # update the FPS counter
281  fps.update()
282  if accel_mode:
283  lock.acquire()
284  fpga = fpgas.popleft()
285  print("Reserved the fpga:"+str(fpga))
286  lock.release()
287  else:
288  fpga = 0
289  if threaded_mode:
290  task = pool.apply_async(process_frame, (frame.copy(), t, threaded_mode, accel_mode, fpga, fpgas))
291  fpga = 0
292  else:
293  task = DummyTask(process_frame(frame, t, threaded_mode, accel_mode, fpga, fpgas))
294  pending.append(task)
295  else:
296  if accel_mode:
297  print("Waiting for a free fpga")
298  else:
299  print("Waiting for a free thread")
300  #if accel_mode and type(fpga) is list:
301  # print("Declare free the fpga: "+str(fpga))
302  # fpgas.appendleft(fpga)
303 
304  ch = cv.waitKey(1)
305  if ch == ord(' '):
306  threaded_mode = not threaded_mode
307  if ch == ord('f'):
308  accel_mode = not accel_mode
309  if ch == 27:
310  break
311 
312 
313  print('Done')
314 
315  # stop the timer and display FPS information
316  fps.stop()
317  print("[INFO] elasped time: {:.2f}".format(fps.elapsed()))
318  print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
319 
320 if __name__ == '__main__':
321  print(__doc__)
322  main()
323  cv.destroyAllWindows()
324 
def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA, debug_level=debug_level)
def crop_square_roi(img, size, interpolation=cv.INTER_AREA, debug_level=debug_level)
def clock()
Definition: common.py:174
def draw_str(dst, target, s)
Definition: common.py:113