cloudFPGA (cF) API  1.0
The documentation of the source code of cloudFPGA (cF)
tc_TcpRecv.py
Go to the documentation of this file.
1 # *
2 # * Copyright 2016 -- 2021 IBM Corporation
3 # *
4 # * Licensed under the Apache License, Version 2.0 (the "License");
5 # * you may not use this file except in compliance with the License.
6 # * You may obtain a copy of the License at
7 # *
8 # * http://www.apache.org/licenses/LICENSE-2.0
9 # *
10 # * Unless required by applicable law or agreed to in writing, software
11 # * distributed under the License is distributed on an "AS IS" BASIS,
12 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # * See the License for the specific language governing permissions and
14 # * limitations under the License.
15 # *
16 
17 # *****************************************************************************
18 # * @file : tc_TcpRecv.py
19 # * @brief : A single-threaded script to receive traffic on the TCP
20 # * connection of an FPGA module (i.e. FPGA --> HOST).
21 # *
22 # * System: : cloudFPGA
23 # * Component : cFp_BringUp/ROLE
24 # * Language : Python 3
25 # *
26 # *****************************************************************************
27 
28 # ### REQUIRED PYTHON PACKAGES ################################################
29 import argparse
30 import datetime
31 import errno
32 import netifaces as ni
33 import socket
34 import struct
35 import time
36 import faulthandler
37 
38 from netifaces import AF_INET
39 
40 # ### REQUIRED TESTCASE MODULES ###############################################
41 from tc_utils import *
42 
43 
44 def tcp_rx_loop(clientSock, serverSock, size, ip_da, tcp_dp, count, verbose=False):
45  """TCP Rx Single-Thread Loop.
46  Requests the FPGA to send 'count' segments of 'size' bytes and expect the HOST to receive
47  them on the socket (ip_da, tcp_port). Each segment is made of the following repetitive
48  pattern '48692066726f6d200x464d4b553630210a' which decodes into "Hi from FMKU60\n".
49  :param clientSock The socket to send to.
50  :param serverSock The socket to receive from.
51  :param size The size of the expected segment.
52  :param ip_da The destination address of the host.
53  :param tcp_dp The destination port of the host.
54  :param count The number of segments to receive.
55  :param verbose Enables verbosity.
56  :return None"""
57  if verbose:
58  print("[INFO] Requesting the FPGA to send %d segments of %d bytes on TCP port number %d." % (count, size, tcp_dp))
59  nrErr = 0
60  loop = 0
61  totalReceivedBytes = 0
62 
63  # Set the client and server sockets non-blocking
64  # -----------------------------------------------
65  clientSock.settimeout(5)
66  clientSock.setblocking(False)
67  serverSock.settimeout(5)
68  serverSock.setblocking(False)
69 
70  # Request the test to generate a segment of length='size' and to send it to socket={ip_da, tcp_dp}
71  # by sending 'ip_da', 'tcp_dp' and 'size' to the FPGA.
72  # Turn the 'ip_da' into an unsigned int binary and 'tcp_dp' and 'size' into an unsigned short binary data.
73  reqMsgAsBytes = struct.pack(">IHH", ip_da, tcp_dp, size)
74  if verbose:
75  print("[DEBUG] >>> reqMsgAsBytes = %s" % reqMsgAsBytes)
76 
77  startTime = datetime.datetime.now()
78  while loop < count:
79  # SEND message length request to FPGA
80  # ------------------------------------
81  try:
82  clientSock.sendall(reqMsgAsBytes)
83  except socket.error as exception:
84  # Any exception
85  print("[EXCEPTION] Socket error while transmitting :: %s" % exception)
86  exit(1)
87  finally:
88  pass
89 
90  # RECEIVE message length bytes from FPGA
91  # ---------------------------------------
92  currRxByteCnt = 0
93  nonBlockingTrials = 0
94  while currRxByteCnt < size:
95  try:
96  data = serverSock.recv(MTU)
97  except IOError as e:
98  # On non blocking connections - when there are no incoming data, error is going to be raised
99  # Some operating systems will indicate that using AGAIN, and some using WOULDBLOCK error code
100  # We are going to check for both - if one of them - that's expected, means no incoming data,
101  # continue as normal. If we got different error code - something happened
102  if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
103  print('\n[ERROR] Socket reading error: {}'.format(str(e)))
104  exit(1)
105  # We just did not receive anything
106  if nonBlockingTrials > 100000:
107  print('\n[ERROR] Too many socket read attempts ({:d})'.format(nonBlockingTrials))
108  break
109  else:
110  nonBlockingTrials += 1
111  if verbose and (nonBlockingTrials % 100 == 0):
112  # print("[DEBUG] So far we received %d bytes out of %d." % (currRxByteCnt, size))
113  print(".", end="")
114  continue
115  except socket.error as exc:
116  # Any other exception
117  print("\n[EXCEPTION] Socket error while receiving :: %s" % exc)
118  exit(1)
119  else:
120  currRxByteCnt += len(data)
121  if verbose:
122  print("\n[INFO] Loop=%6d | RxBytes=%3d | " % (loop, len(data)))
123  print("RxData=%s" % data)
124  # [FIXME-TODO: assess the stream of received bytes]
125  nonBlockingTrials = 0
126  finally:
127  pass
128  totalReceivedBytes += currRxByteCnt
129  loop += 1
130  endTime = datetime.datetime.now()
131  elapseTime = endTime - startTime
132 
133  if totalReceivedBytes < 1000000:
134  print("[INFO] Received a total of %d bytes." % totalReceivedBytes)
135  elif totalReceivedBytes < 1000000000:
136  megaBytes = (totalReceivedBytes * 1.0) / (1024 * 1024 * 1.0)
137  print("[INFO] Received a total of %.1f MB." % megaBytes)
138  else:
139  gigaBytes = (totalReceivedBytes * 1.0) / (1024 * 1024 * 1024 * 1.0)
140  print("[INFO] Transferred a total of %.1f GB." % gigaBytes)
141 
142  bandwidth = (totalReceivedBytes * 8 * 1.0) / (elapseTime.total_seconds() * 1024 * 1024)
143  print("#####################################################")
144  if bandwidth < 1000:
145  print("#### TCP Rx DONE with bandwidth = %6.1f Mb/s" % bandwidth)
146  else:
147  bandwidth = bandwidth / 1000
148  print("#### TCP Rx DONE with bandwidth = %2.1f Gb/s" % bandwidth)
149  if (totalReceivedBytes != (size * count)):
150  print("#### [WARNING] TCP data loss = %.1f%%" % (100 - (totalReceivedBytes) / (size * count)))
151  # [FIXME] Should return an error code
152  print("#####################################################")
153  print()
154 
155 
156 def tcp_rx_ramp(clientSock, serverSock, ip_da, tcp_dp, verbose=False, start=1, end=0xFFFF):
157  """TCP Rx Single-Thread Ramp.
158  Requests the FPGA to send 'start'-'end' number of segments to the HOST, each segment with an
159  increasing number of bytes from 'start' to 'end'. Expect the HOST to receive the segments
160  on the socket (ip_da, tcp_port). Each segment is made of the following repetitive pattern
161  pattern '48692066726f6d200x464d4b553630210a' which decodes into "Hi from FMKU60\n".
162  :param clientSock The socket to send to.
163  :param serverSock The socket to receive from.
164  :param ip_da The destination address of the host
165  :param tcp_dp The destination port of the host.
166  :param verbose Enables verbosity.
167  :param start The start size of the ramp (in bytes).
168  :param end The end size of the ramp (in bytes).
169  :return None"""
170  if verbose:
171  print("[INFO] Requesting the FPGA to send a ramp of segments of increasing size from %d to %d bytes." % (
172  start, end))
173  nrErr = 0
174  loop = 0
175  totalReceivedBytes = 0
176 
177  # Set the client and server sockets non-blocking
178  # -----------------------------------------------
179  clientSock.settimeout(5)
180  clientSock.setblocking(False)
181  serverSock.settimeout(5)
182  serverSock.setblocking(False)
183 
184  startTime = datetime.datetime.now()
185 
186  for size in range(start, end+1):
187 
188  # Request the test to generate a segment of length='size' and to send it to
189  # socket={ip_da, tcp_dp}. This procedure is triggered by sending 'ip_da', 'tcp_dp' and
190  # 'size' to the FPGA.
191  reqMsgAsBytes = struct.pack(">IHH", ip_da, tcp_dp, size)
192  if verbose:
193  print("[DEBUG] Requesting a segment of size=%d bytes (reqMsgAsBytes = %s)" % (size, reqMsgAsBytes))
194 
195  # SEND message length request to FPGA
196  # ------------------------------------
197  try:
198  clientSock.sendall(reqMsgAsBytes)
199  except socket.error as exception:
200  # Any exception
201  print("[EXCEPTION] Socket error while transmitting :: %s" % exception)
202  exit(1)
203  finally:
204  pass
205 
206  # RECEIVE message length bytes from FPGA
207  # ---------------------------------------
208  currRxByteCnt = 0
209  nonBlockingTrials = 0
210  while currRxByteCnt < size:
211  try:
212  data = serverSock.recv(MTU)
213  except IOError as e:
214  # On non blocking connections - when there are no incoming data, error is going to be raised
215  # Some operating systems will indicate that using AGAIN, and some using WOULDBLOCK error code
216  # We are going to check for both - if one of them - that's expected, means no incoming data,
217  # continue as normal. If we got different error code - something happened
218  if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
219  print('[ERROR] Socket reading error: {}'.format(str(e)))
220  exit(1)
221  # We just did not receive anything
222  if nonBlockingTrials > 1000000:
223  print('\n[ERROR] Too many socket read attempts ({:d})'.format(nonBlockingTrials))
224  exit(1)
225  break
226  else:
227  nonBlockingTrials += 1
228  if verbose and (nonBlockingTrials % 100 == 0):
229  # print("[DEBUG] So far we received %d bytes out of %d." % (currRxByteCnt, size))
230  print(".", end="")
231  continue
232  except socket.error as exc:
233  # Any other exception
234  print("[EXCEPTION] Socket error while receiving :: %s" % exc)
235  exit(1)
236  else:
237  currRxByteCnt += len(data)
238  if verbose:
239  print("[INFO] ReqBytes=%d | RxBytes=%d\n" % (size, len(data)))
240  print("RxData=%s\n" % data)
241  # [FIXME-TODO: assess the stream of received bytes]
242  nonBlockingTrials = 0
243  finally:
244  pass
245  totalReceivedBytes += currRxByteCnt
246 
247  endTime = datetime.datetime.now()
248  elapseTime = endTime - startTime
249 
250  if totalReceivedBytes < 1000000:
251  print("[INFO] Received a total of %d bytes." % totalReceivedBytes)
252  elif totalReceivedBytes < 1000000000:
253  megaBytes = (totalReceivedBytes * 1.0) / (1024 * 1024 * 1.0)
254  print("[INFO] Received a total of %.1f MB." % megaBytes)
255  else:
256  gigaBytes = (totalReceivedBytes * 1.0) / (1024 * 1024 * 1024 * 1.0)
257  print("[INFO] Transferred a total of %.1f GB." % gigaBytes)
258 
259  bandwidth = (totalReceivedBytes * 8 * 1.0) / (elapseTime.total_seconds() * 1024 * 1024)
260  print("#####################################################")
261  if bandwidth < 1000:
262  print("#### TCP Rx DONE with bandwidth = %6.1f Mb/s" % bandwidth)
263  else:
264  bandwidth = bandwidth / 1000
265  print("#### TCP Rx DONE with bandwidth = %2.1f Gb/s" % bandwidth)
266  if totalReceivedBytes != (end-start+1)*(start+end)/2:
267  print("#### [WARNING] TCP data loss = %.1f%%" % (100 - (totalReceivedBytes) / (size * count)))
268  print("#####################################################")
269  print()
270 
271 
272 
277 
278 # STEP-1: Parse the command line strings into Python objects
279 # -----------------------------------------------------------------------------
280 parser = argparse.ArgumentParser(description='A script to receive TCP data from an FPGA module.')
281 parser.add_argument('-fi', '--fpga_ipv4', type=str, default='',
282  help='The IPv4 address of the FPGA (a.k.a image_ip / e.g. 10.12.200.163)')
283 parser.add_argument('-ii', '--inst_id', type=int, default=0,
284  help='The instance ID assigned by the cloudFPGA Resource Manager (e.g. 42)')
285 parser.add_argument('-lc', '--loop_count', type=int, default=10,
286  help='The number of test runs (default is 10)')
287 parser.add_argument('-mi', '--mngr_ipv4', type=str, default='10.12.0.132',
288  help='The IPv4 address of the cloudFPGA Resource Manager (default is 10.12.0.132)')
289 parser.add_argument('-mp', '--mngr_port', type=int, default=8080,
290  help='The TCP port of the cloudFPGA Resource Manager (default is 8080)')
291 parser.add_argument('-sd', '--seed', type=int, default=-1,
292  help='The initial number to seed the pseudorandom number generator.')
293 parser.add_argument('-sz', '--size', type=int, default=-1,
294  help='The size of the datagram to receive.')
295 parser.add_argument('-un', '--user_name', type=str, default='',
296  help='A user-name as used to log in ZYC2 (.e.g \'fab\')')
297 parser.add_argument('-up', '--user_passwd', type=str, default='',
298  help='The ZYC2 password attached to the user-name')
299 parser.add_argument('-v', '--verbose', action="store_true",
300  help='Enable verbosity')
301 
302 args = parser.parse_args()
303 
304 if args.user_name == '' or args.user_passwd == '':
305  print("\n[WARNING] You must provide a ZYC2 user name and the corresponding password for this script to execute.\n")
306  exit(1)
307 
308 # STEP-2a: Retrieve the IP address of the FPGA module (this will be the SERVER)
309 # -----------------------------------------------------------------------------
310 ipFpga = getFpgaIpv4(args)
311 
312 # STEP-2b: Retrieve the instance Id assigned by the cloudFPGA Resource Manager
313 # -----------------------------------------------------------------------------
314 instId = getInstanceId(args)
315 
316 # STEP-2c: Retrieve the IP address of the cF Resource Manager
317 # -----------------------------------------------------------------------------
318 ipResMngr = getResourceManagerIpv4(args)
319 
320 # STEP-3a: Set the TCP listen port of the FPGA server (this one is static)
321 # -----------------------------------------------------------------------------
322 portFpgaServer = XMIT_MODE_LSN_PORT # 8801
323 
324 # STEP-3b: Retrieve the TCP port of the cloudFPGA Resource Manager
325 # -----------------------------------------------------------------------------
326 portResMngr = getResourceManagerPort(args)
327 
328 # STEP-4: Trigger the FPGA role to restart (i.e. perform SW reset of the role)
329 # -----------------------------------------------------------------------------
330 restartApp(instId, ipResMngr, portResMngr, args.user_name, args.user_passwd)
331 
332 # STEP-5: Ping the FPGA
333 # -----------------------------------------------------------------------------
334 pingFpga(ipFpga)
335 
336 # STEP-6a: Set the socket association for sending to the FPGA server
337 # -----------------------------------------------------------------------------
338 fpgaServerAssociation = (str(ipFpga), portFpgaServer)
339 
340 # STEP-6b: Create a TCP/IP client socket for sending to FPGA server
341 # -----------------------------------------------------------------------------
342 try:
343  tcpClientSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344 except Exception as exc:
345  print("[EXCEPTION] %s" % exc)
346  exit(1)
347 
348 # Step-6c: Allow this socket to be re-used and disable the Nagle's algorithm
349 # ----------------------------------------------------------------------------
350 tcpClientSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
351 tcpClientSock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
352 
353 # STEP-6d: Connect the host client to the remote FPGA server.
354 # -----------------------------------------------------------------------------
355 try:
356  tcpClientSock.connect(fpgaServerAssociation)
357  print("[INFO] Connecting host client to FPGA server socket ", fpgaServerAssociation)
358 except Exception as exc:
359  print("[EXCEPTION] %s" % exc)
360  exit(1)
361 else:
362  print("[INFO] \tStatus --> Successful connection.")
363 
364 # STEP-7a: Set the socket association for listening from the FPGA client
365 # -----------------------------------------------------------------------------
366 hostname = socket.gethostname()
367 ipHostStr = socket.gethostbyname(hostname)
368 if ipHostStr.startswith('9.4.'):
369  # Search the IP address of the OpenVPN tunnel (FYI: the user-VPN always starts with 10.2.X.X)
370  for itf in ni.interfaces():
371  # DEBUG print("[INFO] Itf=%s " % itf)
372  if itf.startswith('tun'):
373  ip4Str = ni.ifaddresses(itf)[AF_INET][0]['addr']
374  if ip4Str.startswith('10.2.'):
375  ipHostStr = ip4Str
376  break
377  else:
378  ipHostStr = ""
379  if ipHostStr == "":
380  print("[ERROR] Could not find IPv4 address of the tunnel associated with the user-VPN.\n")
381  exit(1)
382 ipHost = int(ipaddress.IPv4Address(ipHostStr))
383 if args.verbose:
384  print("[INFO] Hostname = %s | IP is %s (0x%8.8X)" % (hostname, ipHostStr, ipHost))
385 dpHost = 2718 # Default TCP cF-Themisto ports are in range 2718-2750
386 hostListenAssociation = (str(ipHostStr), dpHost)
387 
388 # STEP-7b: Create a TCP/IP socket for listening to FPGA client(s). Allow
389 # its re-use and set its OS buffer size to 64KB+
390 # -----------------------------------------------------------------------------
391 try:
392  tcpListenSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
393 except Exception as exc:
394  print("[EXCEPTION] %s" % exc)
395  exit(1)
396 tcpListenSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
397 # [TODO] curSO_RCVBUF = tcpListenSock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
398 # [TODO] tcpListenSock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 100*1024)
399 # [TODO] newSO_RCVBUF = tcpListenSock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
400 # [TODO] print("[INFO] The socket receive size was changed from %d to %d" % (curSO_RCVBUF, curSO_RCVBUF))
401 
402 # STEP-7c: Bind the listen socket of the HOST (compulsory) and start listening on it.
403 # bind() -> associates the listen socket with a specific IP address and TCP port.
404 # listen() -> puts the listening socket in server mode.
405 # -----------------------------------------------------------------------------
406 try:
407  tcpListenSock.bind(hostListenAssociation)
408  print("[INFO] Binding and start listening on host socket ", hostListenAssociation)
409 except Exception as exc:
410  print("[EXCEPTION] %s" % exc)
411  exit(1)
412 tcpListenSock.listen(1)
413 
414 # STEP-8: Request the remote FPGA to open a new connection. This will trigger the 3-way
415 # handshake connection with the listening socket and will provide us with a new TCP
416 # socket for the server to communicate with the FPGA.
417 # -----------------------------------------------------------------------------
418 ipHost = int(ipaddress.IPv4Address(ipHostStr))
419 reqMsgAsBytes = struct.pack(">IHH", ipHost, dpHost, 0)
420 print("[INFO] Requesting the remote FPGA client to open a connection with the host")
421 print("[DEBUG] With message = reqMsgAsBytes = %s" % reqMsgAsBytes)
422 try:
423  tcpClientSock.sendall(reqMsgAsBytes)
424 except socket.error as exception:
425  # Any exception
426  print("[EXCEPTION] Socket error while transmitting :: %s" % exception)
427  exit(1)
428 finally:
429  pass
430 
431 # STEP-9: Block and wait for incoming connection from remote FPGA client(s).
432 # -----------------------------------------------------------------------------
433 print("[INFO] Waiting for a connection from remote FPGA")
434 # faulthandler.enable()
435 try:
436  # print("[INFO] Faulthandler enabled with timeout=5")
437  # faulthandler.dump_traceback_later(5) # if faulthandler expires, kill tcpListenSock
438  # tcpListenSock.setblocking(0)
439  tcpListenSock.settimeout(10)
440  print("[INFO] tcpListenSock.timeout=10")
441  tcpServerSock, fpgaClientAssociation = tcpListenSock.accept()
442 except socket.error as exception:
443  # Any exception
444  print("[EXCEPTION] Socket error while waiting for a connection from remote FPGA :: %s" % exception)
445  exit(1)
446 # finally:
447  # faulthandler.cancel_dump_traceback_later() # cancel the last call to dump_traceback_later
448  # print("[INFO] Faulthandler disabled")
449 print("[INFO] Received a connection from FPGA socket address ", fpgaClientAssociation)
450 print()
451 
452 # STEP-10: Setup the test
453 # -------------------------------
454 print("[INFO] Testcase `%s` is run with:" % (os.path.basename(__file__)))
455 if 1:
456  seed = args.seed
457  if seed == -1:
458  seed = random.randint(0, 100000)
459  random.seed(seed)
460  print("\t\t seed = %d" % seed)
461 
462  # [FIXME-Size greater than 2048 is not supported yet because of the ACK-race bug.]
463  TODO_MAX_SEG_SIZE = 2048
464  size = args.size
465  if size == -1:
466  size = random.randint(1, TODO_MAX_SEG_SIZE)
467  elif size > TODO_MAX_SEG_SIZE:
468  print('\nERROR: ')
469  print("[ERROR] This test limits the size of the received segments to %d bytes.\n" % TODO_MAX_SEG_SIZE)
470  exit(1)
471  if seed != 0:
472  print("\t\t size = %d" % size)
473 
474  count = args.loop_count
475  if seed != 0:
476  print("\t\t loop = %d" % count)
477 
478  verbose = args.verbose
479 
480  # STEP-11: Run the test
481  # -------------------------------
482  print("[INFO] This testcase is sending traffic from FPGA-to-HOST.")
483  print("[INFO] This run is executed in single-threading mode.\n")
484  if seed == 0:
485  # [FIXME-Size greater than 2048 not supported yet because of the ACK-race bug.]
486  tcp_rx_ramp(tcpClientSock, tcpServerSock, ipHost, dpHost, args.verbose, 1, 10)
487  tcp_rx_ramp(tcpClientSock, tcpServerSock, ipHost, dpHost, args.verbose, 11, 100)
488  tcp_rx_ramp(tcpClientSock, tcpServerSock, ipHost, dpHost, args.verbose, 101, 1000)
489  tcp_rx_ramp(tcpClientSock, tcpServerSock, ipHost, dpHost, args.verbose, 1001, 2048)
490  else:
491  tcp_rx_loop(tcpClientSock, tcpServerSock, size, ipHost, dpHost, count, args.verbose)
492 
493 # STEP-14: Close sockets
494 # -----------------------
495 time.sleep(2)
496 tcpClientSock.close()
497 tcpServerSock.close()
498 tcpListenSock.close()
def tcp_rx_ramp(clientSock, serverSock, ip_da, tcp_dp, verbose=False, start=1, end=0xFFFF)
Definition: tc_TcpRecv.py:156
def tcp_rx_loop(clientSock, serverSock, size, ip_da, tcp_dp, count, verbose=False)
Definition: tc_TcpRecv.py:44
def restartApp(instId, ipResMngr, portResMngr, user_name, user_passwd)
Definition: tc_utils.py:174
def getFpgaIpv4(args)
Definition: tc_utils.py:95
def getInstanceId(args)
Definition: tc_utils.py:153
def getResourceManagerIpv4(args)
Definition: tc_utils.py:124
def pingFpga(ipFpga)
Definition: tc_utils.py:199
def getResourceManagerPort(args)
Definition: tc_utils.py:142