cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
test_median_blur_video_threaded_ray.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # *****************************************************************************
4 # * cloudFPGA
5 # * Copyright 2016 -- 2022 IBM Corporation
6 # * Licensed under the Apache License, Version 2.0 (the "License");
7 # * you may not use this file except in compliance with the License.
8 # * You may obtain a copy of the License at
9 # *
10 # * http://www.apache.org/licenses/LICENSE-2.0
11 # *
12 # * Unless required by applicable law or agreed to in writing, software
13 # * distributed under the License is distributed on an "AS IS" BASIS,
14 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # * See the License for the specific language governing permissions and
16 # * limitations under the License.
17 # *----------------------------------------------------------------------------
18 
19 
23 
24 '''
25 Usage:
26  test_median_blur_numpi_video_threaded.py <video file name>
27 
28  Shows how python ray capabilities can be used
29  to organize parallel frame processing pipeline
30  with cloudFPGA.
31 
32  On head node the ray server may be started with the command:
33  taskset -c 1 ray start --head --port=6379 --num-cpus=7 --resources='{"cloudFPGA": 7}'
34 
35  Then client nodes can be connected with the command
36  ray start --address='10.12.0.10:6379' --redis-password='5241590000000000' --num-cpus=7 --resources='{"cloudFPGA": 7}'
37  or directly using the ray init method in this script.
38 
39 '''
40 
41 import ray
42 from ray.util.queue import Queue
43 
44 import sys
45 import os
46 import numpy as np
47 import cv2 as cv
48 import logging
49 import time
50 from trieres import *
51 
52 ROI = False
53 accel_mode = True
54 debug_level = logging.INFO
55 
56 config_file=os.environ['cFpRootDir'] + "HOST/vision/median_blur/languages/cplusplus/include/config.h"
57 logging.basicConfig(level=debug_level)
58 
59 with open(config_file) as cfg:
60  for line in cfg:
61  if "#define FRAME_WIDTH" in line:
62  width = int(line.split()[2])
63  elif "#define FRAME_HEIGHT" in line:
64  height = int(line.split()[2])
65 try:
66  logging.info("Found image dimensions in " + config_file + ": width = "+str(width) + ", height = "+str(height))
67  total_size = height * width
68 except:
69  logging.error("Coudln't find FRAME_WIDTH or FRAME_HEIGHT in "+ config_file + ". Aborting...")
70  exit(0)
71 
72 def crop_square_roi(img, size, interpolation=cv.INTER_AREA, debug_level=debug_level):
73  logging.basicConfig(level=debug_level)
74 
75  h, w = img.shape[:2]
76  if ROI:
77  if (h>height) and (w>width):
78  roi_x_pos = int((w-width) /2)
79  roi_y_pos = int((h-height)/2)
80  crop_img = img[int(roi_y_pos):int(roi_y_pos+height), int(roi_x_pos):int(roi_x_pos+width)]
81  else:
82  crop_img = img
83  logging.warning(f"The input image of [{h}x{w}] is not bigger to embed a ROI of [{height}x{width}]. Will just resize")
84  else:
85  min_size = np.amin([np.amin([h,w]), np.amin([height,width])])
86  # Centralize and crop
87  crop_img = img[int(h/2-min_size/2):int(h/2+min_size/2), int(w/2-min_size/2):int(w/2+min_size/2)]
88 
89  # Adjusting the image file if needed
90  if ((crop_img.shape[0] != height) or (crop_img.shape[1] != width)):
91  logging.warning(f"The image was resized from [{crop_img.shape[0]} x {crop_img.shape[1]}] to [{height}x{width}]")
92  resized = cv.resize(crop_img , (size, size), interpolation=interpolation)
93  else:
94  resized = crop_img
95  return resized
96 
97 def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA, debug_level=debug_level):
98  logging.basicConfig(level=debug_level)
99 
100  h_orig, w_orig = orig.shape[:2]
101  h_frame, w_frame = frame.shape[:2]
102 
103  patched_img = orig.copy()
104 
105  if (h_orig>h_frame) and (w_orig>w_frame):
106  roi_x_pos = int((w_orig-w_frame)/2)
107  roi_y_pos = int((h_orig-h_frame)/2)
108  frame_backtorgb = cv.cvtColor(np.float32(frame),cv.COLOR_GRAY2RGB)
109  patched_img[int(roi_y_pos):int(roi_y_pos+h_frame), int(roi_x_pos):int(roi_x_pos+w_frame),:] = frame_backtorgb
110  else:
111  patched_img = frame
112  logging.warning(f"The input image of [{h_orig}x{w_orig}] is not bigger to embed a ROI of [{h_frame}x{w_frame}]. Will just resize")
113  # Adjusting the image file if needed
114  if ((patched_img.shape[0] != h_orig) or (patched_img.shape[1] != w_orig)):
115  logging.warning(f"The image was resized from [{patched_img.shape[0]} x {patched_img.shape[1]}] to [{h_orig}x{w_orig}]")
116  resized = cv.resize(patched_img , (w_orig, h_orig), interpolation=interpolation)
117  else:
118  resized = patched_img
119  return resized
120 
121 
122 #ray.init(dashboard_port=50051, num_cpus=12)
123 ray.init(address='ray://10.12.0.10:10001')
124 
125 print('''This cluster consists of
126  {} nodes in total
127  {} CPU resources in total
128  {} cloudFPGA resources in total
129 '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'], ray.cluster_resources()['cloudFPGA']))
130 
131 # You can pass this object around to different tasks/actors
132 fpgas_queue = Queue(maxsize=100)
133 
134 @ray.remote(resources={'cloudFPGA': 1})
135 def consumer(accel_mode, fpgas_queue, frame, debug_level=debug_level):
136  logging.basicConfig(level=debug_level)
137 
138  orig = frame
139  frame_ret = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
140  # Adjusting the image file if needed
141  #frame_ret = crop_square_roi(frame_ret, width, interpolation = cv.INTER_AREA, debug_level=debug_level)
142  if accel_mode:
143  next_item = fpgas_queue.get(block=True, timeout=100)
144  logging.debug(f"will work on {next_item} and then put in back in the fpgas_queue")
145  # Flattening the image from 2D to 1D
146  image = frame_ret.flatten()
147  output_array = trieres.vision.median_blur(image, total_size, next_item[0], int(next_item[1]), debug_level=debug_level)
148  frame_ret = np.reshape(output_array, (height, width))
149  #frame_ret = cv.medianBlur(frame_ret, 9)
150  fpgas_queue.put(next_item)
151  logging.debug(f"finished working on {next_item} Now it is back in the fpgas_queue")
152  else:
153  frame_ret = cv.medianBlur(frame_ret, 9)
154  if ROI:
155  frame_ret = patch_sqaure_roi(orig, frame_ret, cv.INTER_AREA, debug_level=debug_level)
156  else:
157  frame_ret = cv.cvtColor(np.uint8(frame_ret),cv.COLOR_GRAY2RGB)
158  return frame_ret
159 
160 
161 try:
162  fn = sys.argv[1]
163 except:
164  fn = 0
165 
166 tic_capture = time.perf_counter()
167 
168 cap = cv.VideoCapture(fn)
169 frames = []
170 frames_ret = []
171 
172 # Check if camera opened successfully
173 if (cap.isOpened()== False):
174  print("Error opening video stream or file")
175 
176 # Read until video is completed
177 # TODO: find a more efficient way, without loading all video in ram, e.g. PILs
178 while(cap.isOpened()):
179  # Capture frame-by-frame
180  ret, frame = cap.read()
181  if ret == True:
182  frames.append(frame)
183  # Break the loop
184  else:
185  break
186 
187 # When everything done, release the video capture object
188 cap.release()
189 toc_capture = time.perf_counter()
190 
191 tic_consumers = time.perf_counter()
192 consumers = [consumer.remote(accel_mode, fpgas_queue, frames[i], debug_level=debug_level) for i in range(len(frames))]
193 
194 # 256
195 #[fpgas_queue.put(j) for j in ([ ["10.12.200.97" , "2718"], #])]
196 # ["10.12.200.30" , "2718"], # ])]
197 # ["10.12.200.36" , "2718"], #])]
198 # ["10.12.200.209", "2718"], #])]
199 # ["10.12.200.89" , "2718"], #])]
200 # ["10.12.200.207", "2718"], #])]
201 # ["10.12.200.178", "2718"], #])]
202 # ["10.12.200.27" , "2718"], #])]
203 # ["10.12.200.126", "2718"], #])]
204 # ["10.12.200.73" , "2718"], ])]
205 
206 # 512
207 #[fpgas_queue.put(j) for j in ([ ["10.12.200.66" , "2718"], #])]
208 # ["10.12.200.124" , "2718"], #])]
209 # ["10.12.200.239" , "2718"], #])]
210 # ["10.12.200.28" , "2718"], #])]
211 # ["10.12.200.62" , "2718"], #])]
212 # ["10.12.200.31" , "2718"], #])]
213 # ["10.12.200.226" , "2718"], ])]
214 # ["10.12.200.243" , "2718"], #])]
215 # ["10.12.200.76" , "2718"], #])]
216 # ["10.12.200.249" , "2718"], #])]
217 # ["10.12.200.140" , "2718"], #])]
218 # ["10.12.200.126" , "2718"], ])]
219 
220 # 1024
221 [fpgas_queue.put(j) for j in ([ ["10.12.200.17" , "2718"], #])]
222  ["10.12.200.20" , "2718"], #])]
223  ["10.12.200.69" , "2718"], #])]
224  ["10.12.200.208", "2718"], #])]
225  ["10.12.200.179", "2718"], #])]
226  ["10.12.200.223", "2718"], #])]
227  ["10.12.200.182", "2718"], #])]
228  ["10.12.200.232", "2718"], #])]
229  ["10.12.200.59" , "2718"], ])]
230 
231 toc_consumers = time.perf_counter()
232 
233 tic_exec = time.perf_counter()
234 results = ray.get(consumers)
235 toc_exec = time.perf_counter()
236 logging.info(f"Tasks executed")
237 
238 tic_save = time.perf_counter()
239 video_name = str(fn)+"_out.avi"
240 
241 video_out = cv.VideoWriter(video_name, cv.VideoWriter_fourcc('M','J','P','G'), 30, (results[0].shape[1],results[0].shape[0]))
242 for t in range(len(results)):
243  video_out.write(results[t])
244 video_out.release()
245 logging.info("Saved video: " + video_name)
246 toc_save = time.perf_counter()
247 
248 logging.info(f"Tasks executed : {toc_exec - tic_exec:0.4f} seconds")
249 logging.info(f"Consumers time : {toc_consumers - tic_consumers:0.4f} seconds")
250 logging.info(f"Loading frames : {toc_capture - tic_capture:0.4f} seconds")
251 logging.info(f"Saving video : {toc_save - tic_save:0.4f} seconds")
252 
253 
def patch_sqaure_roi(orig, frame, interpolation=cv.INTER_AREA, debug_level=debug_level)
def consumer(accel_mode, fpgas_queue, frame, debug_level=debug_level)
def crop_square_roi(img, size, interpolation=cv.INTER_AREA, debug_level=debug_level)