cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
test_harris_numpi_video_threaded.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # *****************************************************************************
4 # * cloudFPGA
5 # * Copyright 2016 -- 2022 IBM Corporation
6 # * Licensed under the Apache License, Version 2.0 (the "License");
7 # * you may not use this file except in compliance with the License.
8 # * You may obtain a copy of the License at
9 # *
10 # * http://www.apache.org/licenses/LICENSE-2.0
11 # *
12 # * Unless required by applicable law or agreed to in writing, software
13 # * distributed under the License is distributed on an "AS IS" BASIS,
14 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # * See the License for the specific language governing permissions and
16 # * limitations under the License.
17 # *----------------------------------------------------------------------------
18 
19 
23 
24 '''
25 Harris multithreaded video processing sample.
26 Usage:
27  test_harris_numpi_video_threaded.py {<video device number>|<video file name>}
28 
29  Shows how python threading capabilities can be used
30  to organize parallel captured frame processing pipeline
31  for smoother playback.
32 
33 Keyboard shortcuts:
34 
35  ESC - exit
36  f - switch between CPU and cloudFPGA version (pre-programming is required)
37  space - switch between multi and single threaded processing
38 '''
39 
40 # Python 2/3 compatibility
41 from __future__ import print_function
42 
43 import sys
44 import os
45 video_common_lib=os.environ['cFpRootDir'] + "HOST/vision/common/languages/python/var"
46 sys.path.append(video_common_lib)
47 
48 import numpy as np
49 import cv2 as cv
50 
51 from multiprocessing.pool import ThreadPool
52 from collections import deque
53 
54 from common import clock, draw_str, StatValue
55 import video
56 
57 trieres_lib=os.environ['cFpRootDir'] + "HOST/vision/harris/languages/python/build"
58 sys.path.append(trieres_lib)
59 
60 import _trieres_harris_numpi
61 
62 # size of image to be processed on fpga (the bitstream should be already fixed to this)
63 height = width = 512
64 total_size = height * width
65 
66 # import the necessary packages
67 import datetime
68 class FPS:
69  def __init__(self):
70  # store the start time, end time, and total number of frames
71  # that were examined between the start and end intervals
72  self._start_start = None
73  self._end_end = None
74  self._numFrames_numFrames = 0
75  def start(self):
76  # start the timer
77  self._start_start = datetime.datetime.now()
78  return self
79  def stop(self):
80  # stop the timer
81  self._end_end = datetime.datetime.now()
82  def update(self):
83  # increment the total number of frames examined during the
84  # start and end intervals
85  self._numFrames_numFrames += 1
86  def elapsed(self):
87  # return the total number of seconds between the start and
88  # end interval
89  return (self._end_end - self._start_start).total_seconds()
90  def fps(self):
91  # compute the (approximate) frames per second
92  return self._numFrames_numFrames / self.elapsedelapsed()
93 
94 
95 
96 
97 class DummyTask:
98  def __init__(self, data):
99  self.datadata = data
100  def ready(self):
101  return True
102  def get(self):
103  return self.datadata
104 
105 def main():
106  import sys
107 
108  try:
109  fn = sys.argv[1]
110  except:
111  fn = 0
112  cap = video.create_capture(fn)
113  fps = FPS().start()
114 
115  video_name = str(fn)+"_out.avi"
116  video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc('M','J','P','G'), 10, (width,height))
117 
118  fpgas = deque([ ["10.12.200.3" , "2718"],
119  ["10.12.200.165" , "2719"]])
120 
121 
122  def process_frame(frame, t0, accel_mode, fpga):
123  if accel_mode:
124  print("Will execute on fpga with ip:port: "+fpga[0]+":"+fpga[1])
125  # some intensive computation...
126  # frame = cv.medianBlur(frame, 19)
127  # Converting to grayscale
128  frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
129  # Adjusting the image file if needed
130  if ((frame.shape[0] != height) or (frame.shape[1] != width)):
131  print("WARNING: The image was resized from [", frame.shape[0] , " x ", frame.shape[1] , "] to [", height , " x ", width, "]")
132  dim = (width, height)
133  frame = cv.resize(frame, dim, interpolation = cv.INTER_LINEAR)
134  # Flattening the image from 2D to 1D
135  image = frame.flatten()
136  output_array = _trieres_harris_numpi.harris(image, total_size, fpga[0], fpga[1])
137  # Convert 1D array to a 2D numpy array
138  frame = np.reshape(output_array, (height, width))
139  print("Declare free the fpga: "+str(fpga))
140  fpgas.appendleft(fpga)
141  else:
142  frame = cv.medianBlur(frame, 19)
143  frame = cv.medianBlur(frame, 19)
144  frame = cv.medianBlur(frame, 19)
145  return frame, t0
146 
147  threadn = 4 #cv.getNumberOfCPUs()
148  pool = ThreadPool(processes = threadn)
149  pending = deque()
150 
151  threaded_mode = True
152  accel_mode = True
153  latency = StatValue()
154  frame_interval = StatValue()
155  last_frame_time = clock()
156  while True:
157  while len(pending) > 0 and pending[0].ready() and len(fpgas) > 0:
158  res, t0 = pending.popleft().get()
159  latency.update(clock() - t0)
160 # video_out.write(res)
161  draw_str(res, (20, 20), "threaded : " + str(threaded_mode))
162  draw_str(res, (20, 40), "cloudFPA : " + str(accel_mode))
163  draw_str(res, (20, 60), "latency : %.1f ms" % (latency.value*1000))
164  draw_str(res, (20, 80), "frame interval : %.1f ms" % (frame_interval.value*1000))
165  draw_str(res, (20, 100), "FPS : %.1f" % (1.0/frame_interval.value))
166  #cv.imshow('threaded video', res)
167  if len(pending) < threadn and len(fpgas) != 0:
168  _ret, frame = cap.read()
169  if _ret is False:
170  print("Reached EOF.")
171  print("Saved video: " + video_name)
172  video_out.release()
173  break
174  else:
175  video_out.write(frame)
176  t = clock()
177  frame_interval.update(t - last_frame_time)
178  last_frame_time = t
179  # update the FPS counter
180  fps.update()
181  if accel_mode:
182  fpga = fpgas.popleft()
183  else:
184  fpga = 0
185  if threaded_mode:
186  task = pool.apply_async(process_frame, (frame.copy(), t, accel_mode, fpga))
187  else:
188  task = DummyTask(process_frame(frame, t, accel_mode, fpga))
189  pending.append(task)
190  else:
191  if accel_mode:
192  print("Waiting for a free fpga")
193 
194  ch = cv.waitKey(1)
195  if ch == ord(' '):
196  threaded_mode = not threaded_mode
197  if ch == ord('f'):
198  accel_mode = not accel_mode
199  if ch == 27:
200  break
201 
202 
203  print('Done')
204 
205  # stop the timer and display FPS information
206  fps.stop()
207  print("[INFO] elasped time: {:.2f}".format(fps.elapsed()))
208  print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
209 
210 if __name__ == '__main__':
211  print(__doc__)
212  main()
213  #cv.destroyAllWindows()
214 
def clock()
Definition: common.py:174
def draw_str(dst, target, s)
Definition: common.py:113