cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
tc_TcpSend.py
Go to the documentation of this file.
1 # *
2 # * Copyright 2016 -- 2021 IBM Corporation
3 # *
4 # * Licensed under the Apache License, Version 2.0 (the "License");
5 # * you may not use this file except in compliance with the License.
6 # * You may obtain a copy of the License at
7 # *
8 # * http://www.apache.org/licenses/LICENSE-2.0
9 # *
10 # * Unless required by applicable law or agreed to in writing, software
11 # * distributed under the License is distributed on an "AS IS" BASIS,
12 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # * See the License for the specific language governing permissions and
14 # * limitations under the License.
15 # *
16 
17 # *****************************************************************************
18 # * @file : tc_TcpSend.py
19 # * @brief : A single-threaded script to send traffic on the TCP
20 # * connection of an FPGA module (i.e. HOST --> FPGA).
21 # *
22 # * System: : cloudFPGA
23 # * Component : cFp_Monolithic/ROLE
24 # * Language : Python 3
25 # *
26 # *****************************************************************************
27 
28 # ### REQUIRED PYTHON PACKAGES ################################################
29 import argparse
30 import datetime
31 import socket
32 import time
33 
34 # ### REQUIRED TESTCASE MODULES ###############################################
35 from tc_utils import *
36 
37 
38 def tcp_tx_loop(sock, message, count, verbose=False):
39  """TCP Tx Single-Thread Loop.
40  :param sock The socket to send/receive to/from.
41  :param message The message string to sent.
42  :param count The number of segments to send.
43  :param verbose Enables verbosity.
44  :return None"""
45  if verbose:
46  print("[INFO] The following message of %d bytes will be sent out %d times:\n Message=%s\n"
47  % (len(message), count, message.decode()))
48  nrErr = 0
49  loop = 0
50  txByteCnt = 0
51  startTime = datetime.datetime.now()
52  while loop < count:
53  # Send segment
54  # -------------------
55  try:
56  sock.sendall(message)
57  except socket.error as exc:
58  # Any exception
59  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
60  exit(1)
61  finally:
62  pass
63  txByteCnt += len(message)
64  if verbose:
65  print("Loop=%d | TxBytes=%d" % (loop, txByteCnt))
66  # if loop > 10:
67  # time.sleep(0.1)
68  # input('Hit <Enter> to continue:')
69  loop += 1
70  endTime = datetime.datetime.now()
71  elapseTime = endTime - startTime
72 
73  display_throughput(txByteCnt, elapseTime)
74 
75 def tcp_tx_slowpace(sock, message, count, pause, verbose=False):
76  """TCP Tx test at reduce speed (by inserting a sleep duration in between two transmissions)
77  :param sock The socket to send/receive to/from.
78  :param message The message string to sent.
79  :param count The number of segments to send.
80  :param pause The idle duration between two segments (in seconds)
81  :param verbose Enables verbosity.
82  :return None"""
83  if verbose:
84  print("[INFO] TCP-TX-SLOW-PACE: The following message of %d bytes will be sent out %d times"
85  " with an inter-gap time of %f seconds:\n Message=%s\n" %
86  (len(message), count, pause, message.decode()))
87  nrErr = 0
88  loop = 0
89  totalByteCnt = 0
90  txWinByteCnt = 0
91  startTime = datetime.datetime.now()
92  while loop < count:
93  # Send segment
94  # -------------------
95  try:
96  sock.sendall(message)
97  except socket.error as exc:
98  # Any exception
99  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
100  exit(1)
101  finally:
102  pass
103  txWinByteCnt += len(message)
104  totalByteCnt += len(message)
105  if verbose:
106  print("Loop=%6d | TotalTxBytes=%6d | Pause=%4f sec" % (loop+1, totalByteCnt, pause))
107  time.sleep(pause)
108  txWinByteCnt = 0
109  loop += 1
110  endTime = datetime.datetime.now()
111  elapseTime = endTime - startTime
112 
113  display_throughput(totalByteCnt, elapseTime)
114 
115 
116 def tcp_tx_payload_ramp(sock, message, count, pause=0.0, verbose=False):
117  """TCP Tx Single-Thread Ramp. Send a buffer of bytes with 64-bit unsigned integer numbers
118  ramping up from 1 to len(message).
119  :param sock The socket to send/receive to/from.
120  :param message The message string to sent.
121  :param count The number of segments to send.
122  :param pause The idle duration between two segments (in seconds)
123  :param verbose Enables verbosity.
124  :return None"""
125  strStream = ""
126  size = len(message) * count
127  if size <= 8:
128  for i in range(0, size-1):
129  strStream += '0'
130  else:
131  rampSize = int(size/8)
132  for i in range(0, rampSize):
133  strTmp = "{:08d}".format(i)
134  # Swap the generated 8 bytes
135  strStream += strTmp[7]
136  strStream += strTmp[6]
137  strStream += strTmp[5]
138  strStream += strTmp[4]
139  strStream += strTmp[3]
140  strStream += strTmp[2]
141  strStream += strTmp[1]
142  strStream += strTmp[0]
143  for i in range(0, int(size % 8)):
144  strStream += 'E'
145  bytStream = strStream.encode()
146 
147  if verbose:
148  print("[INFO] The following stream of %d bytes will be sent out:\n Message=%s\n" %
149  (len(message), bytStream.decode()))
150 
151  startTime = datetime.datetime.now()
152 
153  if pause == 0.0:
154  # --------------------------------
155  # Send the entire stream at once
156  # --------------------------------
157  try:
158  sock.sendall(bytStream)
159  except socket.error as exc:
160  # Any exception
161  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
162  exit(1)
163  finally:
164  pass
165  else:
166  # --------------------------------------
167  # Send the stream in pieces of 8 bytes.
168  # The use of a 'pause' enforces the TCP 'PSH'
169  # --------------------------------------
170  for i in range(0, rampSize):
171  subStr = bytStream[i*8: i*8+8]
172  try:
173  sock.sendall(subStr)
174  except socket.error as exc:
175  # Any exception
176  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
177  exit(1)
178  finally:
179  pass
180  time.sleep(pause)
181  if size % 8:
182  subStr = bytStream[(size/8)*8: (size/8)*8 + (size % 8)]
183  try:
184  sock.sendall(subStr)
185  except socket.error as exc:
186  # Any exception
187  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
188  exit(1)
189  finally:
190  pass
191  time.sleep(pause)
192 
193  endTime = datetime.datetime.now()
194  elapseTime = endTime - startTime
195 
196  txByteCnt = len(message) * count
197  display_throughput(txByteCnt, elapseTime)
198 
199 def tcp_tx_seg_size_ramp(sock, message, count, pause=0.0, verbose=False):
200  """Send a ramp of increasing segment sizes starting from 1 bytes up to len(message).
201  :param sock The socket to send/receive to/from.
202  :param message The message string to sent.
203  :param count The number of segments to send.
204  :param pause The idle duration between two segments (in seconds)
205  :param verbose Enables verbosity.
206  :return None"""
207 
208  nrErr = 0
209  loop = 0
210  txByteCnt = 0
211  startTime = datetime.datetime.now()
212 
213  while loop < count:
214  for i in range(1, len(message)):
215  # Send segment of length 'i'
216  subMsg = message[0: i]
217  try:
218  sock.sendall(subMsg)
219  except socket.error as exc:
220  # Any exception
221  print("[EXCEPTION] Socket error while transmitting :: %s" % exc)
222  exit(1)
223  finally:
224  pass
225  txByteCnt += len(message)
226  if verbose:
227  print("Loop=%4.4d | SegmentLength=%4.4d" % (loop, i))
228  if pause != 0.0:
229  time.sleep(pause)
230  loop += 1
231 
232  endTime = datetime.datetime.now()
233  elapseTime = endTime - startTime
234 
235  display_throughput(txByteCnt, elapseTime)
236 
237 
238 
243 
244 # STEP-1: Parse the command line strings into Python objects
245 # -----------------------------------------------------------------------------
246 parser = argparse.ArgumentParser(description='A script to send TCP data to an FPGA module.')
247 parser.add_argument('-fi', '--fpga_ipv4', type=str, default='',
248  help='The IPv4 address of the FPGA (a.k.a image_ip / e.g. 10.12.200.163)')
249 parser.add_argument('-ii', '--inst_id', type=int, default=0,
250  help='The instance ID assigned by the cloudFPGA Resource Manager (e.g. 42)')
251 parser.add_argument('-lc', '--loop_count', type=int, default=10,
252  help='The number of test runs (default is 10)')
253 parser.add_argument('-mi', '--mngr_ipv4', type=str, default='10.12.0.132',
254  help='The IPv4 address of the cloudFPGA Resource Manager (default 10.12.0.132)')
255 parser.add_argument('-mp', '--mngr_port', type=int, default=8080,
256  help='The TCP port of the cloudFPGA Resource Manager (default is 8080)')
257 parser.add_argument('-nr', '--no_reset', action="store_true",
258  help='Do not reset the application role')
259 parser.add_argument('-sd', '--seed', type=int, default=-1,
260  help='The initial number to seed the pseudorandom number generator.')
261 parser.add_argument('-st', '--sleep_time', type=float, default=0.0,
262  help='Enforce a sleep time in between two segments (in seconds)')
263 parser.add_argument('-sz', '--size', type=int, default=-1,
264  help='The size of the datagram to generate.')
265 parser.add_argument('-un', '--user_name', type=str, default='',
266  help='A user-name as used to log in ZYC2 (.e.g \'fab\')')
267 parser.add_argument('-up', '--user_passwd', type=str, default='',
268  help='The ZYC2 password attached to the user-name')
269 parser.add_argument('-v', '--verbose', action="store_true",
270  help='Enable verbosity')
271 
272 args = parser.parse_args()
273 
274 if args.user_name == '' or args.user_passwd == '':
275  print("\nWARNING: You must provide a ZYC2 user name and the corresponding password for this "
276  "script to execute.\n")
277  exit(1)
278 
279 # STEP-2a: Retrieve the IP address of the FPGA module (this will be the SERVER)
280 # -----------------------------------------------------------------------------
281 ipFpga = getFpgaIpv4(args)
282 
283 # STEP-2b: Retrieve the instance Id assigned by the cloudFPGA Resource Manager
284 # -----------------------------------------------------------------------------
285 instId = getInstanceId(args)
286 
287 # STEP-2c: Retrieve the IP address of the cF Resource Manager
288 # -----------------------------------------------------------------------------
289 ipResMngr = getResourceManagerIpv4(args)
290 
291 # STEP-3a: Set the TCP listen port of the FPGA server (this one is static)
292 # -----------------------------------------------------------------------------
293 portFpga = RECV_MODE_LSN_PORT # 8800
294 
295 # STEP-3b: Retrieve the TCP port of the cloudFPGA Resource Manager
296 # -----------------------------------------------------------------------------
297 portResMngr = getResourceManagerPort(args)
298 
299 # STEP-4: Trigger the FPGA role to restart (i.e. perform SW reset of the role)
300 # -----------------------------------------------------------------------------
301 if not args.no_reset:
302  restartApp(instId, ipResMngr, portResMngr, args.user_name, args.user_passwd)
303 
304 # STEP-5: Ping the FPGA
305 # -----------------------------------------------------------------------------
306 pingFpga(ipFpga)
307 
308 # STEP-6a: Set the FPGA socket association
309 # -----------------------------------------------------------------------------
310 fpgaAssociation = (str(ipFpga), portFpga)
311 
312 # STEP-6b: Set the HOST socket association (optional)
313 # Info: Linux selects a source port from an ephemeral port range, which by
314 # default is a set to range from 32768 to 61000. You can check it
315 # with the command:
316 # > cat /proc/sys/net/ipv4/ip_local_port_range
317 # If we want to force the source port ourselves, we must use the
318 # "bind before connect" trick.
319 # -----------------------------------------------------------------------------
320 if 0:
321  tcpSP = portFpga + 49152 # 8803 + 0xC000
322  hostAssociation = (ipSaStr, tcpSP)
323 
324 # STEP-8a: Create a TCP/IP socket for the TCP/IP connection
325 # -----------------------------------------------------------------------------
326 try:
327  tcpSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
328 except Exception as exc:
329  print("[EXCEPTION] %s" % exc)
330  exit(1)
331 
332 # Step-8b: Allow this socket to be re-used and disable the Nagle's algorithm
333 # ----------------------------------------------------------------------------
334 tcpSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
335 tcpSock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
336 
337 # STEP-8c: Bind before connect (optional).
338 # This trick enables us to ask the kernel to select a specific source IP and
339 # source PORT by calling bind() before calling connect().
340 # -----------------------------------------------------------------------------
341 if 0:
342  try:
343  tcpSock.bind(hostAssociation)
344  print('Binding the socket address of the HOST to {%s, %d}' % hostAssociation)
345  except Exception as exc:
346  print("[EXCEPTION] %s" % exc)
347  exit(1)
348 
349 # STEP-9a: Connect with remote FPGA
350 # -----------------------------------------------------------------------------
351 try:
352  tcpSock.connect(fpgaAssociation)
353 except Exception as exc:
354  print("[EXCEPTION] %s" % exc)
355  exit(1)
356 else:
357  print('\nSuccessful connection with socket address of FPGA at {%s, %d} \n' % fpgaAssociation)
358 
359 # STEP-9b: Set the socket non-blocking
360 # --------------------------------------
361 tcpSock.setblocking(False)
362 tcpSock.settimeout(5)
363 
364 # STEP-10: Setup the test
365 # -------------------------------
366 print("[INFO] Testcase `%s` is run with:" % (os.path.basename(__file__)))
367 seed = args.seed
368 if seed == -1:
369  seed = random.randint(0, 100000)
370 random.seed(seed)
371 print("\t\t seed = %d" % seed)
372 
373 size = args.size
374 if size == -1:
375  size = random.randint(1, ZYC2_MSS)
376 elif size > ZYC2_MSS:
377  print('\nERROR: ')
378  print("[ERROR] This test-case expects the transfer of segment which are less or equal to MSS "
379  "(.i.e %d bytes).\n" % ZYC2_MSS)
380  exit(1)
381 print("\t\t size = %d" % size)
382 
383 count = args.loop_count
384 print("\t\t loop = %d" % count)
385 
386 if seed % 2:
387  message = str_rand_gen(size)
388 else:
389  message = str_static_gen(size)
390 
391 verbose = args.verbose
392 
393 # STEP-11: Run the test
394 # -------------------------------
395 print("[INFO] This testcase is sending traffic from HOST-to-FPGA. ")
396 print("[INFO] It is run in single-threading mode.\n")
397 if seed == 0:
398  # SEND A PAYLOAD WITH INCREMENTAL DATA NUMBERS
399  message = "X" * size
400  tcp_tx_payload_ramp(tcpSock, message, count, args.sleep_time, args.verbose)
401 elif seed == 1:
402  # SEND A RAMP OF INCREMENTAL SEGMENT SIZES
403  tcp_tx_seg_size_ramp(tcpSock, message, count, args.sleep_time, args.verbose)
404 else:
405  if args.sleep_time > 0.0:
406  # RUN THE TEST AT LOW SPEED
407  tcp_tx_slowpace(tcpSock, message, count, args.sleep_time, args.verbose)
408  else:
409  tcp_tx_loop(tcpSock, message, count, args.verbose)
410 
411 # STEP-14: Close socket
412 # -----------------------
413 time.sleep(2)
414 tcpSock.close()
415 
416 
def tcp_tx_loop(sock, message, count, verbose=False)
Definition: tc_TcpSend.py:38
def tcp_tx_seg_size_ramp(sock, message, count, pause=0.0, verbose=False)
Definition: tc_TcpSend.py:199
def tcp_tx_payload_ramp(sock, message, count, pause=0.0, verbose=False)
Definition: tc_TcpSend.py:116
def tcp_tx_slowpace(sock, message, count, pause, verbose=False)
Definition: tc_TcpSend.py:75
def restartApp(instId, ipResMngr, portResMngr, user_name, user_passwd)
Definition: tc_utils.py:174
def getFpgaIpv4(args)
Definition: tc_utils.py:95
def getInstanceId(args)
Definition: tc_utils.py:153
def getResourceManagerIpv4(args)
Definition: tc_utils.py:124
def str_rand_gen(size)
Definition: tc_utils.py:89
def str_static_gen(size)
Definition: tc_utils.py:79
def pingFpga(ipFpga)
Definition: tc_utils.py:199
def display_throughput(byteCount, elapseTime)
Definition: tc_utils.py:211
def getResourceManagerPort(args)
Definition: tc_utils.py:142