cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
tc_TcpEcho.py
Go to the documentation of this file.
1 #/*
2 # * Copyright 2016 -- 2021 IBM Corporation
3 # *
4 # * Licensed under the Apache License, Version 2.0 (the "License");
5 # * you may not use this file except in compliance with the License.
6 # * You may obtain a copy of the License at
7 # *
8 # * http://www.apache.org/licenses/LICENSE-2.0
9 # *
10 # * Unless required by applicable law or agreed to in writing, software
11 # * distributed under the License is distributed on an "AS IS" BASIS,
12 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # * See the License for the specific language governing permissions and
14 # * limitations under the License.
15 # *
16 
17 # *****************************************************************************
18 # * @file : tc_TcpEcho.py
19 # * @brief : A multi-threaded script to send and receive traffic on the
20 # * TCP connection of an FPGA module.
21 # *
22 # * System: : cloudFPGA
23 # * Component : cFp_BringUp/ROLE
24 # * Language : Python 3
25 # *
26 # *****************************************************************************
27 
28 # ### REQUIRED PYTHON PACKAGES ################################################
29 import argparse
30 import datetime
31 import errno
32 import filecmp
33 import socket
34 import threading
35 import time
36 
37 # ### REQUIRED TESTCASE MODULES ###############################################
38 from tc_utils import *
39 
40 # ### GLOBAL VARIABLES ########################################################
41 gEchoRxPath = './echoRx.dat'
42 gEchoTxPath = './echoTx.dat'
43 
44 def tcp_tx(sock, message, count, verbose=False):
45  """TCP Tx Thread.
46  :param sock, the socket to send to.
47  :param message, the random string to sent.
48  :param count, the number of segments to send.
49  :param verbose, enables verbosity.
50  :return None"""
51  if verbose:
52  print("The following message of %d bytes will be sent out %d times:\n Message=%s\n" %
53  (len(message), count, message.decode('ascii')))
54 
55  # Create a Tx Reference File
56  echoTxFile = open(gEchoTxPath, 'w')
57  if count <= 1000:
58  loop = 0
59  while loop < count:
60  echoTxFile.write(message.decode('ascii'))
61  loop += 1
62 
63  # Start Data Transmission
64  loop = 0
65  startTime = datetime.datetime.now()
66  while loop < count:
67  try:
68  sock.sendall(message)
69  finally:
70  pass
71  loop += 1
72  endTime = datetime.datetime.now()
73  elapseTime = endTime - startTime;
74  bandwidth = len(message) * 8 * count * 1.0 / (elapseTime.total_seconds() * 1024 * 1024)
75  print("##################################################")
76  print("#### TCP TX DONE with bandwidth = %6.1f Mb/s ####" % bandwidth)
77  print("##################################################")
78  print()
79 
80  # Close the Tx Reference File
81  echoTxFile.close()
82  # Push a few more bytes to force the FPGA to flush its buffers
83  try:
84  sock.sendall(message)
85  finally:
86  pass
87 
88 
89 def tcp_rx(sock, message, count, verbose):
90  """TCP Rx Thread.
91  :param sock, the socket to receive from.
92  :param message, the expected string message to be received.
93  :param count, the number of segment to receive.
94  :param verbose, enables verbosity.
95  :return None"""
96 
97  # Create an Rx Test File
98  echoRxFile = open(gEchoRxPath, 'w')
99 
100  # Start Data Reception
101  loop = 0
102  rxBytes = 0
103  expectedBytes = count*len(message)
104  startTime = datetime.datetime.now()
105  while rxBytes < expectedBytes:
106  try:
107  data = sock.recv(expectedBytes - rxBytes)
108  rxBytes += len(data)
109  if count <= 1000:
110  echoRxFile.write(data.decode('ascii'))
111  except socket.error as exc:
112  print("[EXCEPTION] Socket error while receiving :: %s" % exc)
113  else:
114  if verbose:
115  print("Loop=%d | RxBytes=%d" % (loop, rxBytes))
116  loop += 1
117  endTime = datetime.datetime.now()
118  elapseTime = endTime - startTime
119  bandwidth = len(message) * 8 * count * 1.0 / (elapseTime.total_seconds() * 1024 * 1024)
120  print("##################################################")
121  print("#### TCP RX DONE with bandwidth = %6.1f Mb/s ####" % bandwidth)
122  print("##################################################")
123  print()
124 
125  # Close the Rx Test File
126  echoRxFile.close()
127 
128 
129 def waitUntilSocketPairCanBeReused(ipFpga, portFpga):
130  """Check and wait until the a socket pair can be reused.
131  [INFO] When a client or a server initiates an active close, then the same destination socket
132  (i.e. the same IP address / TCP port number) cannot be re-used immediately because
133  of security issues. Therefore, a closed connection must linger in a 'TIME_WAIT' or
134  'FIN_WAIT' state for as long as 2xMSL (Maximum Segment Lifetime), which corresponds
135  to twice the time a TCP segment might exist in the internet system. The MSL is
136  arbitrarily defined to be 2 minutes long.
137  :param ipFpga: the IP address of FPGA.
138  :param portFpga: the TCP port of the FPGA.
139  :return: nothing
140  """
141  wait = True
142  # NETSTAT example: rc = os.system("netstat | grep '10.12.200.163:8803' | grep TIME_WAIT")
143  cmdStr = "netstat | grep \'" + str(ipFpga) + ":" + str(portFpga) + "\' | grep \'TIME_WAIT\|FIN_WAIT\' "
144  while wait:
145  rc = os.system(cmdStr)
146  if rc == 0:
147  print("[INFO] Cannot reuse this socket as long as it is in the \'TIME_WAIT\' or \'FIN_WAIT\' state.")
148  print(" Let's sleep for 5 sec...")
149  time.sleep(5)
150  else:
151  wait = False
152 
153 def tcp_txrx_loop(sock, message, count, verbose=False):
154  """TCP Tx-Rx Single-Thread Loop.
155  :param sock The socket to send/receive to/from.
156  :param message The message string to sent.
157  :param count The number of segments send.
158  :param verbose Enables verbosity.
159  :return None"""
160  if verbose:
161  print("[INFO] The following message of %d bytes will be sent out %d times:\n Message=%s\n" %
162  (len(message), count, message.decode('ascii')))
163  nrErr = 0
164  txMssgCnt = 0
165  rxMssgCnt = 0
166  rxByteCnt = 0
167  txStream = ""
168  rxStream = ""
169 
170  # Init the Tx reference stream
171  for i in range(count):
172  txStream = txStream + message.decode('ascii')
173 
174  startTime = datetime.datetime.now()
175 
176  while rxByteCnt < (count * len(message)):
177  if txMssgCnt < count:
178  # Send a new message
179  # ------------------------
180  try:
181  tcpSock.sendall(message)
182  txMssgCnt += 1
183  finally:
184  pass
185 
186  # Receive a segment
187  # --------------------
188  try:
189  data = tcpSock.recv(len(message))
190  rxByteCnt += len(data)
191  rxMssgCnt += 1
192  if verbose:
193  print("%d:%s" % (rxMssgCnt, data.decode('ascii')))
194  except IOError as e:
195  # On non blocking connections - when there are no incoming data, error is going to be
196  # raised. Some operating systems will indicate that using AGAIN, and some using
197  # WOULDBLOCK error code. We are going to check for both - if one of them - that's
198  # expected, means no incoming data, continue as normal. If we got different error code,
199  # something happened
200  if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
201  print('[ERROR] Socket reading error: {}'.format(str(e)))
202  exit(1)
203  # We just did not receive anything
204  continue
205  except socket.error as exc:
206  # Any other exception
207  print("[EXCEPTION] Socket error while receiving :: %s" % exc)
208  # exit(1)
209  finally:
210  pass
211  rxStream = rxStream + data.decode('ascii')
212 
213  endTime = datetime.datetime.now()
214 
215  if verbose:
216  print("\n")
217 
218  # Compare Tx and Rx stream
219  if rxStream != txStream:
220  print(" KO | Received stream = %s" % data.decode('ascii'))
221  print(" | Expected stream = %s" % rxStream)
222  nrErr += 1
223  elif verbose:
224  print(" OK | Received %d bytes in %d messages." % (rxByteCnt, rxMssgCnt))
225 
226  elapseTime = endTime - startTime;
227  bandwidth = len(message) * 8 * count * 1.0 / (elapseTime.total_seconds() * 1024 * 1024)
228  print("[INFO] Transferred a total of %d bytes." % rxByteCnt)
229  print("#####################################################")
230  print("#### TCP Tx/Rx DONE with bandwidth = %6.1f Mb/s ####" % bandwidth)
231  print("#####################################################")
232  print()
233 
234 def tcp_txrx_ramp(sock, message, count, verbose=False):
235  """TCP Tx-Rx Single-Thread Ramp.
236  :param sock The socket to send/receive to/from.
237  :param message The message string to sent.
238  :param count The number of segments to send.
239  :param verbose Enables verbosity.
240  :return None"""
241  if verbose:
242  print("[INFO] The following message of %d bytes will be sent out incrementally %d times:\n Message=%s\n" %
243  (len(message), count, message.decode('ascii')))
244  nrErr = 0
245  loop = 0
246  rxByteCnt = 0
247  startTime = datetime.datetime.now()
248  while loop < count:
249  i = 1
250  while i <= len(message):
251  subMsg = message[0:i]
252  # Send datagram
253  # -------------------
254  try:
255  tcpSock.sendall(subMsg)
256  finally:
257  pass
258  # Receive datagram
259  # -------------------
260  try:
261  data = tcpSock.recv(len(subMsg))
262  rxByteCnt += len(data)
263  if data == subMsg:
264  if verbose:
265  print("Loop=%d | RxBytes=%d" % (loop, len(data)))
266  else:
267  print("Loop=%d | RxBytes=%d" % (loop, len(data)))
268  print(" KO | Received Message=%s" % data.decode('ascii'))
269  print(" | Expecting Message=%s" % subMsg)
270  nrErr += 1
271  except IOError as e:
272  # On non blocking connections - when there are no incoming data, error is going to be raised
273  # Some operating systems will indicate that using AGAIN, and some using WOULDBLOCK error code
274  # We are going to check for both - if one of them - that's expected, means no incoming data,
275  # continue as normal. If we got different error code - something happened
276  if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
277  print('[ERROR] Socket reading error: {}'.format(str(e)))
278  exit(1)
279  # We just did not receive anything
280  continue
281  except socket.error as exc:
282  # Any other exception
283  print("[EXCEPTION] Socket error while receiving :: %s" % exc)
284  # exit(1)
285  finally:
286  pass
287  i += 1
288  loop += 1
289  endTime = datetime.datetime.now()
290  elapseTime = endTime - startTime
291  bandwidth = (rxByteCnt * 8 * count * 1.0) / (elapseTime.total_seconds() * 1024 * 1024)
292  megaBytes = (rxByteCnt * 1.0) / (1024 * 1024 * 1.0)
293  print("[INFO] Transferred a total of %.1f MB." % megaBytes)
294  print("#####################################################")
295  print("#### TCP Tx/Rx DONE with bandwidth = %6.1f Mb/s ####" % bandwidth)
296  print("#####################################################")
297  print()
298 
299 
300 
305 rc = 0
306 
307 # STEP-1: Parse the command line strings into Python objects
308 # -----------------------------------------------------------------------------
309 parser = argparse.ArgumentParser(description='A script to send/receive TCP data to/from an FPGA module.')
310 parser.add_argument('-fi', '--fpga_ipv4', type=str, default='',
311  help='The destination IPv4 address of the FPGA (a.k.a image_ip / e.g. 10.12.200.163)')
312 parser.add_argument('-fp', '--fpga_port', type=int, default=8803,
313  help='The TCP destination port of the FPGA (default is 8803)')
314 parser.add_argument('-ii', '--inst_id', type=int, default=0,
315  help='The instance ID assigned by the cloudFPGA Resource Manager (range is 1-32)')
316 parser.add_argument('-lc', '--loop_count', type=int, default=10,
317  help='The number of times to run run the test (default is 10)')
318 parser.add_argument('-mi', '--mngr_ipv4', type=str, default='10.12.0.132',
319  help='The IP address of the cloudFPGA Resource Manager (default is 10.12.0.132)')
320 parser.add_argument('-mp', '--mngr_port', type=int, default=8080,
321  help='The TCP port of the cloudFPGA Resource Manager (default is 8080)')
322 parser.add_argument('-mt', '--multi_threading', action="store_true",
323  help='Enable multi_threading')
324 parser.add_argument('-sd', '--seed', type=int, default=-1,
325  help='The initial number to seed the pseudo-random number generator.')
326 parser.add_argument('-sz', '--size', type=int, default=-1,
327  help='The size of the segment to generate.')
328 parser.add_argument('-un', '--user_name', type=str, default='',
329  help='A user name as used to log in ZYC2 (.e.g \'fab\')')
330 parser.add_argument('-up', '--user_passwd', type=str, default='',
331  help='The ZYC2 password attached to the user name')
332 parser.add_argument('-v', '--verbose', action="store_true",
333  help='Enable verbosity')
334 
335 args = parser.parse_args()
336 
337 if args.user_name == '' or args.user_passwd == '':
338  print("\nWARNING: You must provide a ZYC2 user name and the corresponding password for this script to execute.\n")
339  exit(1)
340 
341 # STEP-2a: Retrieve the IP address of the FPGA module (this will be the SERVER)
342 # ------------------------------------------------------------------------------
343 ipFpga = getFpgaIpv4(args)
344 
345 # STEP-2b: Retrieve the instance Id assigned by the cloudFPGA Resource Manager
346 # -----------------------------------------------------------------------------
347 instId = getInstanceId(args)
348 
349 # STEP-2c: Retrieve the IP address of the cF Resource Manager
350 # -----------------------------------------------------------------------------
351 ipResMngr = getResourceManagerIpv4(args)
352 
353 # STEP-3a: Retrieve the TCP port of the FPGA server
354 # -----------------------------------------------------------------------------
355 portFpga = getFpgaPort(args)
356 
357 # STEP-3b: Retrieve the TCP port of the cloudFPGA Resource Manager
358 # -----------------------------------------------------------------------------
359 portResMngr = getResourceManagerPort(args)
360 
361 # STEP-?: Configure the application registers
362 # -----------------------------------------------------------------------------
363 # TODO print("\nNow: Configuring the application registers.")
364 # TODO tcpEchoPathThruMode = (0x0 << 4) # See DIAG_CTRL_2 register
365 
366 # STEP-4: Trigger the FPGA role to restart (i.e. perform SW reset of the role)
367 # -----------------------------------------------------------------------------
368 restartApp(instId, ipResMngr, portResMngr, args.user_name, args.user_passwd)
369 
370 # STEP-5: Ping the FPGA
371 # -----------------------------------------------------------------------------
372 pingFpga(ipFpga)
373 
374 # STEP-6a: Set the FPGA socket association
375 # -----------------------------------------------------------------------------
376 tcpDP = 8803 # 8803=0x2263 and 0x6322=25378
377 fpgaAssociation = (str(ipFpga), tcpDP)
378 
379 # STEP-6b: Set the HOST socket association (optional)
380 # Info: Linux selects a source port from an ephemeral port range, which by
381 # default is a set to range from 32768 to 61000. You can check it
382 # with the command:
383 # > cat /proc/sys/net/ipv4/ip_local_port_range
384 # If we want to force the source port ourselves, we must use the
385 # "bind before connect" trick.
386 # -----------------------------------------------------------------------------
387 if 0:
388  tcpSP = tcpDP + 49152 # 8803 + 0xC000
389  hostAssociation = (ipSaStr, tcpSP)
390 
391 # STEP-7: Wait until the current socket can be reused
392 # -----------------------------------------------------------------------------
393 if 0:
394  waitUntilSocketPairCanBeReused(ipFpga, portFpga)
395 
396 # STEP-8a: Create a TCP/IP socket for the TCP/IP connection
397 # -----------------------------------------------------------------------------
398 try:
399  tcpSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
400 except Exception as exc:
401  print("[EXCEPTION] %s" % exc)
402  exit(1)
403 
404 # Step-8b: Allow this socket to be re-used and disable the Nagle's algorithm
405 # ----------------------------------------------------------------------------
406 tcpSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
407 tcpSock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
408 
409 # STEP-8c: Bind before connect (optional).
410 # This trick enables us to ask the kernel to select a specific source IP and
411 # source PORT by calling bind() before calling connect().
412 # -----------------------------------------------------------------------------
413 if 0:
414  try:
415  tcpSock.bind(hostAssociation)
416  print('Binding the socket address of the HOST to {%s, %d}' % hostAssociation)
417  except Exception as exc:
418  print("[EXCEPTION] %s" % exc)
419  exit(1)
420 
421 # STEP-9: Connect to the remote FPGA
422 # -----------------------------------------------------------------------------
423 try:
424  tcpSock.connect(fpgaAssociation)
425 except Exception as exc:
426  print("[EXCEPTION] %s" % exc)
427  exit(1)
428 else:
429  print('\nSuccessful connection with socket address of FPGA at {%s, %d} \n' % fpgaAssociation)
430 
431 # STEP-10: Setup the test
432 # -------------------------------
433 print("[INFO] Testcase `%s` is run with:" % (os.path.basename(__file__)))
434 seed = args.seed
435 if seed == -1:
436  seed = random.randint(0, 100000)
437 random.seed(seed)
438 print("\t\t seed = %d" % seed)
439 
440 size = args.size
441 if size == -1:
442  size = random.randint(1, ZYC2_MSS)
443 elif size > ZYC2_MSS:
444  print('\nERROR: ')
445  print("[ERROR] This test-case expects the transfer of segment which are less or equal to MSS (.i.e %d bytes).\n" % ZYC2_MSS)
446  exit(1)
447 print("\t\t size = %d" % size)
448 
449 count = args.loop_count
450 print("\t\t loop = %d" % count)
451 
452 if seed % 1:
453  message = str_static_gen(size)
454 else:
455  message = str_rand_gen(size)
456 
457 verbose = args.verbose
458 
459 print("[INFO] This testcase is sending traffic from HOST-to-FPGA and back from FPGA-to-HOST.")
460 if args.multi_threading:
461  print("[INFO] This run is executed in multi-threading mode.\n")
462  # STEP-11: Create Rx and Tx threads
463  # ----------------------------------
464  tx_thread = threading.Thread(target=tcp_tx, args=(tcpSock, message, count, args.verbose))
465  rx_thread = threading.Thread(target=tcp_rx, args=(tcpSock, message, count, args.verbose))
466  # STEP-12: Start the threads
467  # ---------------------------
468  tx_thread.start()
469  rx_thread.start()
470  # STEP-13: Wait for threads to terminate
471  # ----------------------------------------
472  tx_thread.join()
473  rx_thread.join()
474  # STEP-14: Compare Rx and Tx files
475  # ----------------------------------------
476  result = filecmp.cmp(gEchoTxPath, gEchoRxPath, shallow=False)
477  if not result:
478  print("\n[ERROR] Rx file \'%s\' differs from Tx file \'%s\'.\n" % (gEchoRxPath, gEchoTxPath))
479  rc = 1
480  else:
481  os.remove(gEchoRxPath)
482  os.remove(gEchoTxPath)
483 else:
484  print("[INFO] The run is executed in single-threading mode.\n")
485  # STEP-11: Set the socket in non-blocking mode
486  # ----------------------------------------------
487  tcpSock.setblocking(False)
488  tcpSock.settimeout(5)
489  if seed == 0:
490  tcp_txrx_ramp(tcpSock, message, count, args.verbose)
491  else:
492  tcp_txrx_loop(tcpSock, message, count, args.verbose)
493 
494 # STEP-14: Close socket
495 # -----------------------
496 time.sleep(2)
497 tcpSock.close()
498 
499 exit(rc)
def tcp_txrx_loop(sock, message, count, verbose=False)
Definition: tc_TcpEcho.py:153
def tcp_rx(sock, message, count, verbose)
Definition: tc_TcpEcho.py:89
def tcp_tx(sock, message, count, verbose=False)
Definition: tc_TcpEcho.py:44
def tcp_txrx_ramp(sock, message, count, verbose=False)
Definition: tc_TcpEcho.py:234
def waitUntilSocketPairCanBeReused(ipFpga, portFpga)
Definition: tc_TcpEcho.py:129
def restartApp(instId, ipResMngr, portResMngr, user_name, user_passwd)
Definition: tc_utils.py:174
def getFpgaIpv4(args)
Definition: tc_utils.py:95
def getInstanceId(args)
Definition: tc_utils.py:153
def getResourceManagerIpv4(args)
Definition: tc_utils.py:124
def str_rand_gen(size)
Definition: tc_utils.py:89
def getFpgaPort(args)
Definition: tc_utils.py:113
def str_static_gen(size)
Definition: tc_utils.py:79
def pingFpga(ipFpga)
Definition: tc_utils.py:199
def getResourceManagerPort(args)
Definition: tc_utils.py:142