26 test_median_blur_numpi_video_threaded.py <video file name>
28 Shows how python ray capabilities can be used
29 to organize parallel frame processing pipeline
35 from ray.util.queue
import Queue
46 debug_level = logging.DEBUG
47 logging.basicConfig(stream=sys.stdout, level=debug_level)
51 total_size = height * width
59 ray.init(address=
'ray://10.1.1.24:10001')
67 fpgas_queue = Queue(maxsize=100)
70 def consumer(accel_mode, fpgas_queue, frames, i, debug_level=debug_level):
71 logging.basicConfig(stream=sys.stdout, level=debug_level)
75 next_item = fpgas_queue.get(block=
True, timeout=100)
76 logging.debug(f
"will work on {next_item} and then put in back in the fpgas_queue")
78 image = frame.flatten()
79 output_array = trieres.vision.median_blur(image, total_size, next_item[0],
int(next_item[1]), debug_level=debug_level)
80 frame = np.reshape(output_array, (height, width))
82 fpgas_queue.put(next_item)
83 logging.debug(f
"finished working on {next_item} Now it is back in the fpgas_queue")
85 logging.debug(f
"Apllying medianBlur")
86 frame = cv.medianBlur(frame, 9)
91 def mega_work(accel_mode, fpgas_queue, frames, debug_level, start, end):
92 print(
"will work on ["+
str(start)+
"-"+
str(end)+
"]")
93 return [
consumer(accel_mode, fpgas_queue, frames, x, debug_level=debug_level)
for x
in range(start, end)]
96 if(
type(input[0])==list):
110 tic_capture = time.perf_counter()
114 frame = cv.imread(cv.samples.findFile(
"CARLA.jpg"))
116 sys.exit(
"Could not read the image.")
118 frame = cv.cvtColor(frame, cv.COLOR_RGB2GRAY)
119 for i
in range(DATASET_SIZE):
123 toc_capture = time.perf_counter()
126 for _
in range(REPEATS):
127 start_time = time.time()
129 tic_consumers = time.perf_counter()
130 frames_id = ray.put(frames)
131 fpgas_queue_id = ray.put(fpgas_queue)
134 consumers = [mega_work.remote(accel_mode, fpgas_queue_id, frames_id, debug_level, i*split, (i+1)*split)
for i
in range(
int(len(frames)/split))]
144 [fpgas_queue.put(j)
for j
in ([ [
"10.12.200.9" ,
"2718"],
145 [
"10.12.200.212" ,
"2718"],
146 [
"10.12.200.170" ,
"2718"],
147 [
"10.12.200.83" ,
"2718"],
148 [
"10.12.200.234" ,
"2718"],
149 [
"10.12.200.219" ,
"2718"],
150 [
"10.12.200.21" ,
"2718"],
151 [
"10.12.200.243" ,
"2718"],
152 [
"10.12.200.76" ,
"2718"],
153 [
"10.12.200.249" ,
"2718"],
154 [
"10.12.200.140" ,
"2718"],
155 [
"10.12.200.126" ,
"2718"], ])]
157 toc_consumers = time.perf_counter()
159 tic_exec = time.perf_counter()
161 results = ray.get(consumers)
164 toc_exec = time.perf_counter()
165 logging.info(f
"Tasks executed")
167 logging.info(f
"Tasks executed : {toc_exec - tic_exec:0.4f} seconds")
168 logging.info(f
"Consumers time : {toc_consumers - tic_consumers:0.4f} seconds")
169 logging.info(f
"Loading frames : {toc_capture - tic_capture:0.4f} seconds")
171 end_time = time.time()
172 elapsed_times.append(end_time - start_time)
174 tic_save = time.perf_counter()
175 for t
in range(len(results)):
176 image_name =
"CARLA_out_"+
str(t)+
".jpg"
177 cv.imwrite(image_name, results[t])
178 logging.info(
"Last saved image: " + image_name)
179 toc_save = time.perf_counter()
180 logging.info(f
"Saving images : {toc_save - tic_save:0.4f} seconds")
182 elapsed_times = np.sort(elapsed_times)
183 average_elapsed_time = sum(elapsed_times) / REPEATS
185 print(
"Time required to submit a trivial function call:")
186 print(
" Average: {}".format(average_elapsed_time))
187 print(
" 90th percentile: {}".format(elapsed_times[round((90/100) * REPEATS)-1]))
188 print(
" 99th percentile: {}".format(elapsed_times[round((99/100) * REPEATS)-1]))
189 print(
" best: {}".format(elapsed_times[0]))
190 print(
" worst: {}".format(elapsed_times[round((99.9/100) * REPEATS)-1]))