cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
test_median_blur_images_ray.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # *****************************************************************************
4 # * cloudFPGA
5 # * Copyright 2016 -- 2022 IBM Corporation
6 # * Licensed under the Apache License, Version 2.0 (the "License");
7 # * you may not use this file except in compliance with the License.
8 # * You may obtain a copy of the License at
9 # *
10 # * http://www.apache.org/licenses/LICENSE-2.0
11 # *
12 # * Unless required by applicable law or agreed to in writing, software
13 # * distributed under the License is distributed on an "AS IS" BASIS,
14 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # * See the License for the specific language governing permissions and
16 # * limitations under the License.
17 # *----------------------------------------------------------------------------
18 
19 
23 
24 '''
25 Usage:
26  test_median_blur_numpi_video_threaded.py <video file name>
27 
28  Shows how python ray capabilities can be used
29  to organize parallel frame processing pipeline
30  with cloudFPGA.
31 
32 '''
33 
34 import ray
35 from ray.util.queue import Queue
36 
37 import sys
38 import os
39 import numpy as np
40 import cv2 as cv
41 import logging
42 import time
43 from trieres import *
44 
45 accel_mode = False
46 debug_level = logging.DEBUG
47 logging.basicConfig(stream=sys.stdout, level=debug_level)
48 
49 width = 3840
50 height = 2160
51 total_size = height * width
52 
53 DATASET_SIZE = 50
54 split = 1
55 REPEATS = 1
56 
57 #ray.init(dashboard_port=50051, num_cpus=12)
58 #ray.init(address='ray://192.168.1.8:10001')
59 ray.init(address='ray://10.1.1.24:10001')
60 
61 #print('''This cluster consists of
62 # {} nodes in total
63 # {} CPU resources in total
64 #'''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))
65 
66 # You can pass this object around to different tasks/actors
67 fpgas_queue = Queue(maxsize=100)
68 
69 #@ray.remote
70 def consumer(accel_mode, fpgas_queue, frames, i, debug_level=debug_level):
71  logging.basicConfig(stream=sys.stdout, level=debug_level)
72 
73  frame = frames[i]
74  if accel_mode:
75  next_item = fpgas_queue.get(block=True, timeout=100)
76  logging.debug(f"will work on {next_item} and then put in back in the fpgas_queue")
77  # Flattening the image from 2D to 1D
78  image = frame.flatten()
79  output_array = trieres.vision.median_blur(image, total_size, next_item[0], int(next_item[1]), debug_level=debug_level)
80  frame = np.reshape(output_array, (height, width))
81  #frame = cv.medianBlur(frame, 9)
82  fpgas_queue.put(next_item)
83  logging.debug(f"finished working on {next_item} Now it is back in the fpgas_queue")
84  else:
85  logging.debug(f"Apllying medianBlur")
86  frame = cv.medianBlur(frame, 9)
87  return frame
88 
89 
90 @ray.remote
91 def mega_work(accel_mode, fpgas_queue, frames, debug_level, start, end):
92  print("will work on ["+str(start)+"-"+str(end)+"]")
93  return [consumer(accel_mode, fpgas_queue, frames, x, debug_level=debug_level) for x in range(start, end)]
94 
95 def flatten(input):
96  if(type(input[0])==list):
97  new_list = []
98  for i in input:
99  for j in i:
100  new_list.append(j)
101  return new_list
102  else:
103  return input
104 
105 try:
106  fn = sys.argv[1]
107 except:
108  fn = 0
109 
110 tic_capture = time.perf_counter()
111 
112 frames = []
113 frames_ret = []
114 frame = cv.imread(cv.samples.findFile("CARLA.jpg"))
115 if frame is None:
116  sys.exit("Could not read the image.")
117 else:
118  frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
119  for i in range(DATASET_SIZE):
120  frames.append(frame)
121 
122 # When everything done, release the video capture object
123 toc_capture = time.perf_counter()
124 
125 elapsed_times = []
126 for _ in range(REPEATS):
127  start_time = time.time()
128 
129  tic_consumers = time.perf_counter()
130  frames_id = ray.put(frames)
131  fpgas_queue_id = ray.put(fpgas_queue)
132 
133  #consumers = [consumer.remote(accel_mode, fpgas_queue_id, frames_id, i, debug_level=debug_level) for i in range(len(frames))]
134  consumers = [mega_work.remote(accel_mode, fpgas_queue_id, frames_id, debug_level, i*split, (i+1)*split) for i in range(int(len(frames)/split))]
135 
136  # 256
137  #[fpgas_queue.put(j) for j in ([ ["10.12.200.171" , "2718"], #])]
138  # ["10.12.200.73" , "2718"], # ])]
139  # ["10.12.200.205" , "2718"], #])]
140  # ["10.12.200.69" , "2718"], #])]
141  # ["10.12.200.181" , "2718"] ])]
142 
143  # 512
144  [fpgas_queue.put(j) for j in ([ ["10.12.200.9" , "2718"], #])]
145  ["10.12.200.212" , "2718"], #])]
146  ["10.12.200.170" , "2718"], #])]
147  ["10.12.200.83" , "2718"], #])]
148  ["10.12.200.234" , "2718"], #])]
149  ["10.12.200.219" , "2718"], #])]
150  ["10.12.200.21" , "2718"], #])]
151  ["10.12.200.243" , "2718"], #])]
152  ["10.12.200.76" , "2718"], #])]
153  ["10.12.200.249" , "2718"], #])]
154  ["10.12.200.140" , "2718"], #])]
155  ["10.12.200.126" , "2718"], ])]
156 
157  toc_consumers = time.perf_counter()
158 
159  tic_exec = time.perf_counter()
160 
161  results = ray.get(consumers)
162  results = flatten(results)
163 
164  toc_exec = time.perf_counter()
165  logging.info(f"Tasks executed")
166 
167  logging.info(f"Tasks executed : {toc_exec - tic_exec:0.4f} seconds")
168  logging.info(f"Consumers time : {toc_consumers - tic_consumers:0.4f} seconds")
169  logging.info(f"Loading frames : {toc_capture - tic_capture:0.4f} seconds")
170 
171  end_time = time.time()
172  elapsed_times.append(end_time - start_time)
173 
174 tic_save = time.perf_counter()
175 for t in range(len(results)):
176  image_name = "CARLA_out_"+str(t)+".jpg"
177  cv.imwrite(image_name, results[t])
178 logging.info("Last saved image: " + image_name)
179 toc_save = time.perf_counter()
180 logging.info(f"Saving images : {toc_save - tic_save:0.4f} seconds")
181 
182 elapsed_times = np.sort(elapsed_times)
183 average_elapsed_time = sum(elapsed_times) / REPEATS
184 print(elapsed_times)
185 print("Time required to submit a trivial function call:")
186 print(" Average: {}".format(average_elapsed_time))
187 print(" 90th percentile: {}".format(elapsed_times[round((90/100) * REPEATS)-1]))
188 print(" 99th percentile: {}".format(elapsed_times[round((99/100) * REPEATS)-1]))
189 print(" best: {}".format(elapsed_times[0]))
190 print(" worst: {}".format(elapsed_times[round((99.9/100) * REPEATS)-1]))
def consumer(accel_mode, fpgas_queue, frames, i, debug_level=debug_level)
def mega_work(accel_mode, fpgas_queue, frames, debug_level, start, end)