cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
test_warp_transform_numpi_video_threaded.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # *****************************************************************************
4 # * cloudFPGA
5 # * Copyright 2016 -- 2022 IBM Corporation
6 # * Licensed under the Apache License, Version 2.0 (the "License");
7 # * you may not use this file except in compliance with the License.
8 # * You may obtain a copy of the License at
9 # *
10 # * http://www.apache.org/licenses/LICENSE-2.0
11 # *
12 # * Unless required by applicable law or agreed to in writing, software
13 # * distributed under the License is distributed on an "AS IS" BASIS,
14 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # * See the License for the specific language governing permissions and
16 # * limitations under the License.
17 # *----------------------------------------------------------------------------
18 
19 
23 
24 '''
25 WarpTransform multithreaded video processing sample.
26 Usage:
27  test_warp_transform_numpi_video_threaded.py {<video device number>|<video file name>}
28 
29  Shows how python threading capabilities can be used
30  to organize parallel captured frame processing pipeline
31  for smoother playback.
32 
33 Keyboard shortcuts:
34 
35  ESC - exit
36  f - switch between CPU and cloudFPGA version (pre-programming is required)
37  space - switch between multi and single threaded processing
38 '''
39 
40 # Python 2/3 compatibility
41 from __future__ import print_function
42 
43 import sys
44 import os
45 video_common_lib=os.environ['cFpRootDir'] + "HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
47 
48 import numpy as np
49 import cv2 as cv
50 
51 import multiprocessing
52 
53 from multiprocessing.pool import ThreadPool
54 from collections import deque
55 
56 # Manager to create shared object.
57 manager = multiprocessing.Manager()
58 from threading import Lock
59 
60 from common import clock, draw_str, StatValue
61 import video
62 import time
63 
64 trieres_lib=os.environ['cFpRootDir'] + "HOST/vision/warp_transform/languages/python/build"
65 sys.path.append(trieres_lib)
66 
67 import _trieres_warp_transform_numpi
68 
69 ROI = True
70 
71 # import the necessary packages
72 import datetime
73 class FPS:
74  def __init__(self):
75  # store the start time, end time, and total number of frames
76  # that were examined between the start and end intervals
77  self._start_start = None
78  self._end_end = None
79  self._numFrames_numFrames = 0
80  def start(self):
81  # start the timer
82  self._start_start = datetime.datetime.now()
83  return self
84  def stop(self):
85  # stop the timer
86  self._end_end = datetime.datetime.now()
87  def update(self):
88  # increment the total number of frames examined during the
89  # start and end intervals
90  self._numFrames_numFrames += 1
91  def elapsed(self):
92  # return the total number of seconds between the start and
93  # end interval
94  return (self._end_end - self._start_start).total_seconds()
95  def fps(self):
96  # compute the (approximate) frames per second
97  return self._numFrames_numFrames / self.elapsedelapsed()
98 
99 
100 
101 
102 class DummyTask:
103  def __init__(self, data):
104  self.datadata = data
105  def ready(self):
106  return True
107  def get(self):
108  return self.datadata
109 
110 def main():
111  import sys
112 
113  config_file=os.environ['cFpRootDir'] + "HOST/vision/warp_transform/languages/cplusplus/include/config.h"
114 
115  with open(config_file) as cfg:
116  for line in cfg:
117  if "#define FRAME_WIDTH" in line:
118  width = int(line.split()[2])
119  elif "#define FRAME_HEIGHT" in line:
120  height = int(line.split()[2])
121  try:
122  print("Found in " + config_file + ": width = "+str(width) + ", height = "+str(height))
123  total_size = height * width
124  except:
125  print("Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file + ". Aborting...")
126  exit(0)
127 
128  try:
129  fn = sys.argv[1]
130  except:
131  fn = 0
132  cap = video.create_capture(fn)
133  fps = FPS().start()
134 
135  # Create a lock.
136  lock = manager.Lock()
137  #lock = Lock()
138 
139  def crop_square_roi(img, size, interpolation=cv.INTER_AREA):
140  h, w = img.shape[:2]
141  if ROI:
142  if (h>height) and (w>width):
143  roi_x_pos = int((w-width) /2)
144  roi_y_pos = int((h-height)/2)
145  crop_img = img[int(roi_y_pos):int(roi_y_pos+height), int(roi_x_pos):int(roi_x_pos+width)]
146  else:
147  crop_img = img
148  print("WARNING: The input image of [", h , " x ", w , "] is not bigger to crop a ROI of [", height , " x ", width, "]. Will just resize")
149  else:
150  min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
151  # Centralize and crop
152  crop_img = img[int(h/2-min_size/2):int(h/2+min_size/2), int(w/2-min_size/2):int(w/2+min_size/2)]
153 
154  # Adjusting the image file if needed
155  if ((crop_img.shape[0] != height) or (crop_img.shape[1] != width)):
156  print("WARNING: The image was resized from [", crop_img.shape[0] , " x ", crop_img.shape[1] , "] to [", height , " x ", width, "]")
157  resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
158  else:
159  resized = crop_img
160  return resized
161 
162 
163 
164  def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA):
165  h_orig, w_orig = orig.shape[:2]
166  h_frame, w_frame = frame.shape[:2]
167 
168  patched_img = orig
169 
170  if (h_orig>h_frame) and (w_orig>w_frame):
171  roi_x_pos = int((w_orig-w_frame)/2)
172  roi_y_pos = int((h_orig-h_frame)/2)
173  frame_backtorgb = cv.cvtColor(frame,cv.COLOR_GRAY2RGB)
174  patched_img[int(roi_y_pos):int(roi_y_pos+h_frame), int(roi_x_pos):int(roi_x_pos+w_frame),:] = frame_backtorgb
175  else:
176  patched_img = frame
177  print("WARNING: The input image of [", h_orig , " x ", w_orig , "] is not bigger to embed a ROI of [", h_frame , " x ", w_frame, "]. Will just resize")
178  print("after 1st if")
179  # Adjusting the image file if needed
180  if ((patched_img.shape[0] != h_orig) or (patched_img.shape[1] != w_orig)):
181  print("WARNING: The image was resized from [", patched_img.shape[0] , " x ", patched_img.shape[1] , "] to [", h_orig , " x ", w_orig, "]")
182  resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
183  else:
184  resized = patched_img
185  return resized
186 
187 
188 
189  def process_frame(frame, t0, threaded_mode, accel_mode, fpga, fpgas):
190  # Converting to grayscale
191  orig = frame
192  frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
193 
194  # Adjusting the image file if needed
195 
196  frame = crop_square_roi(frame, width, interpolation = cv.INTER_AREA)
197 
198  if accel_mode:
199  #print("Will execute on fpga with ip:port: "+fpga[0]+":"+fpga[1])
200  # some intensive computation...
201  # Flattening the image from 2D to 1D
202  image = frame.flatten()
203  output_array = _trieres_warp_transform_numpi.warp_transform(image, total_size, fpga[0], fpga[1])
204  # Convert 1D array to a 2D numpy array
205  #time.sleep(1)
206  frame = np.reshape(output_array, (height, width))
207  print(type(frame))
208  print("Declare free the fpga: "+str(fpga))
209  lock.acquire()
210  if threaded_mode:
211  fpgas.append(fpga)
212  else:
213  fpgas.appendleft(fpga)
214  lock.release()
215  else:
216  #time.sleep(10)
217  frame = cv.medianBlur(frame, 9)
218  if ROI:
219  frame = patch_sqaure_roi(orig, frame, cv.INTER_AREA)
220  print("returning from process_frame")
221  return frame, t0
222 
223 
224  threaded_mode = True
225  accel_mode = True
226 
227  fpgas = deque([["10.12.200.225" , "2718"],
228  ["10.12.200.224" , "2719"]])
229 # ["10.12.200.11" , "2720"],
230 # ["10.12.200.19" , "2721"],
231 # ["10.12.200.29" , "2722"]])
232 
233  if accel_mode:
234  threadn = len(fpgas)
235  else:
236  threadn = cv.getNumberOfCPUs()
237  pool = ThreadPool(processes = threadn)
238  pending = deque()
239 
240  latency = StatValue()
241  frame_interval = StatValue()
242  last_frame_time = clock()
243  while True:
244  fpga = 0
245  print("Before while len(pending)="+str(len(pending)))
246  while len(pending) > 0 and pending[0].ready() :
247  print("After while len(pending)="+str(len(pending)))
248  print("Before pending.popleft().get()")
249  res, t0 = pending.popleft().get()
250  print("After pending.popleft().get(): len(pending)="+str(len(pending)))
251  print(type(fpga))
252  print(str(fpga))
253  #exit(0)
254  latency.update(clock() - t0)
255  draw_str(res, (20, 20), "threaded : " + str(threaded_mode))
256  draw_str(res, (20, 40), "cloudFPA : " + str(accel_mode))
257  draw_str(res, (20, 60), "latency : %.1f ms" % (latency.value*1000))
258  draw_str(res, (20, 80), "frame interval : %.1f ms" % (frame_interval.value*1000))
259  draw_str(res, (20, 100), "FPS : %.1f" % (1.0/frame_interval.value))
260 # try:
261 # video_out.write(res)
262 # except:
263 # video_name = str(fn)+"_out.avi"
264 # video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc('M','J','P','G'), 30, (res.shape[1],res.shape[0]))
265  #print("video_out Size is:"+str(res.shape[1])+","+str(res.shape[0]))
266  #cv.imshow('threaded video', res)
267  if len(pending) < threadn: # and len(fpgas) != 0:
268  _ret, frame = cap.read()
269  if _ret is False:
270  print("Reached EOF.")
271  print("Saved video: " + video_name)
272  #video_out.release()
273  break
274  #print("frame Size is:"+str(frame.shape[1])+","+str(frame.shape[0]))
275  t = clock()
276  frame_interval.update(t - last_frame_time)
277  last_frame_time = t
278  # update the FPS counter
279  fps.update()
280  if accel_mode:
281  lock.acquire()
282  fpga = fpgas.popleft()
283  print("Reserved the fpga:"+str(fpga))
284  lock.release()
285  else:
286  fpga = 0
287  if threaded_mode:
288  task = pool.apply_async(process_frame, (frame.copy(), t, threaded_mode, accel_mode, fpga, fpgas))
289  fpga = 0
290  else:
291  task = DummyTask(process_frame(frame, t, threaded_mode, accel_mode, fpga, fpgas))
292  pending.append(task)
293  else:
294  if accel_mode:
295  print("Waiting for a free fpga")
296  else:
297  print("Waiting for a free thread")
298  #if accel_mode and type(fpga) is list:
299  # print("Declare free the fpga: "+str(fpga))
300  # fpgas.appendleft(fpga)
301 
302  ch = cv.waitKey(1)
303  if ch == ord(' '):
304  threaded_mode = not threaded_mode
305  if ch == ord('f'):
306  accel_mode = not accel_mode
307  if ch == 27:
308  break
309 
310 
311  print('Done')
312 
313  # stop the timer and display FPS information
314  fps.stop()
315  print("[INFO] elasped time: {:.2f}".format(fps.elapsed()))
316  print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
317 
318 if __name__ == '__main__':
319  print(__doc__)
320  main()
321  #cv.destroyAllWindows()
322 
def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA, debug_level=debug_level)
def crop_square_roi(img, size, interpolation=cv.INTER_AREA, debug_level=debug_level)
def clock()
Definition: common.py:174
def draw_str(dst, target, s)
Definition: common.py:113