@@ -364,3 +364,305 @@ def get_pcap_mean_pkt_size(self, pcap_path: str) -> float:
364364
365365 mean_pcap_pkt_size = float (output )
366366 return mean_pcap_pkt_size
367+
368+
369+ class EnsoGenSwitch (Pktgen ):
370+ """Python wrapper for the EnsōGen packet generator."""
371+
372+ def __init__ (
373+ self ,
374+ nic : EnsoNic ,
375+ core_id : int = 0 ,
376+ queues : int = 4 ,
377+ single_core : bool = False ,
378+ rtt : bool = False ,
379+ rtt_hist : bool = False ,
380+ rtt_hist_offset : Optional [int ] = None ,
381+ rtt_hist_len : Optional [int ] = None ,
382+ stats_file : Optional [str ] = None ,
383+ hist_file : Optional [str ] = None ,
384+ stats_delay : Optional [int ] = None ,
385+ pcie_addr : Optional [str ] = None ,
386+ log_file : Union [bool , TextIO ] = False ,
387+ check_tx_rate = False ,
388+ ) -> None :
389+ super ().__init__ ()
390+
391+ self .nic = nic
392+ self .queues = queues
393+
394+ self .core_id = core_id
395+ self .single_core = single_core
396+ self .rtt = rtt
397+ self .rtt_hist = rtt_hist
398+ self .rtt_hist_offset = rtt_hist_offset
399+ self .rtt_hist_len = rtt_hist_len
400+ self .stats_delay = stats_delay
401+
402+ if pcie_addr is not None and pcie_addr .count (":" ) == 1 :
403+ pcie_addr = f"0000:{ pcie_addr } " # Add domain.
404+ self .pcie_addr = pcie_addr
405+
406+ self .stats_file = stats_file or "stats.csv"
407+ self .hist_file = hist_file or "hist.csv"
408+
409+ self .log_file = log_file
410+ self .check_tx_rate = check_tx_rate
411+
412+ self .pktgen_cmd = None
413+
414+ self .clean_stats ()
415+
416+ def set_params (
417+ self , pkt_size : int , nb_src : int , nb_dst : int , nb_pcaps : int
418+ ) -> None :
419+ self .pcap_paths = [None * 1024 ]
420+ self .nb_pcaps = 0
421+
422+ nb_pkts = nb_src * nb_dst
423+
424+ remote_dir_path = Path (self .nic .enso_path )
425+
426+ pcap_gen_cmd = remote_dir_path / Path (PCAP_GEN_CMD )
427+ for i in range (nb_pcaps ):
428+ dst_start = i * nb_dst
429+ pcap_name = (
430+ f"{ nb_pkts } _{ pkt_size } _{ nb_src } _{ nb_dst } _{ dst_start } .pcap"
431+ )
432+ pcap_dst = remote_dir_path / Path (PCAPS_DIR ) / Path (pcap_name )
433+
434+ pcap_gen_cmd = (
435+ f"{ pcap_gen_cmd } { nb_pkts } { pkt_size } { nb_src } "
436+ f"{ nb_dst } { dst_start } "
437+ )
438+
439+ pcap_gen_cmd = self .nic .host .run_command (
440+ pcap_gen_cmd , print_command = self .log_file
441+ )
442+ pcap_gen_cmd .watch (stdout = self .log_file , stderr = self .log_file )
443+
444+ status = pcap_gen_cmd .recv_exit_status ()
445+ if status != 0 :
446+ raise RuntimeError ("Error generating pcap" )
447+
448+ self .pcap_paths [i ] = pcap_dst
449+ self .nb_pcaps += 1
450+
451+ def start (self , throughput : float , nb_pkts : int , window_size : int ) -> None :
452+ """Start packet generation.
453+
454+ Args:
455+ throughput: Throughput in bits per second.
456+ nb_pkts: Number of packets to transmit.
457+ """
458+ if self .pcap_paths is None :
459+ raise RuntimeError ("No pcaps were configured" )
460+
461+ bits_per_pkt = (self .mean_pcap_pkt_size + ETHERNET_OVERHEAD ) * 8
462+ pkts_per_sec = throughput / bits_per_pkt
463+ flits_per_pkt = math .ceil (self .mean_pcap_pkt_size / 64 )
464+
465+ hardware_rate = pkts_per_sec * flits_per_pkt / FPGA_RATELIMIT_CLOCK
466+
467+ rate_frac = Fraction (hardware_rate ).limit_denominator (1000 )
468+
469+ self .current_throughput = throughput
470+ self .current_goodput = pkts_per_sec * self .mean_pcap_pkt_size * 8
471+ self .current_pps = pkts_per_sec
472+ self .expected_tx_duration = nb_pkts / pkts_per_sec
473+
474+ # Make sure remote stats file does not exist.
475+ remove_stats_file = self .nic .host .run_command (
476+ f"rm -f { self .stats_file } " ,
477+ print_command = False ,
478+ )
479+ remove_stats_file .watch (stdout = self .log_file , stderr = self .log_file )
480+ status = remove_stats_file .recv_exit_status ()
481+ if status != 0 :
482+ raise RuntimeError (
483+ f"Error removing remote stats file ({ self .stats_file } )"
484+ )
485+
486+ command = (
487+ f"sudo { self .nic .enso_path } /{ ENSOGEN_CMD } "
488+ f" { rate_frac .numerator } { rate_frac .denominator } "
489+ + f" --pkts-per-pcap { window_size } "
490+ f" --count { nb_pkts } "
491+ f" --core { self .core_id } "
492+ f" --queues { self .queues } "
493+ f" --save { self .stats_file } "
494+ )
495+
496+ for i in range (self .nb_pcaps ):
497+ command += f" --pcap-file { self .pcap_paths [i ]} "
498+
499+ if self .single_core :
500+ command += " --single-core"
501+
502+ if self .rtt :
503+ command += " --rtt"
504+
505+ if self .rtt_hist :
506+ command += f" --rtt-hist { self .hist_file } "
507+
508+ if self .rtt_hist_offset is not None :
509+ command += f" --rtt-hist-offset { self .rtt_hist_offset } "
510+
511+ if self .rtt_hist_len is not None :
512+ command += f" --rtt-hist-len { self .rtt_hist_len } "
513+
514+ if self .stats_delay is not None :
515+ command += f" --stats-delay { self .stats_delay } "
516+
517+ if self .pcie_addr is not None :
518+ command += f" --pcie-addr { self .pcie_addr } "
519+
520+ self .pktgen_cmd = self .nic .host .run_command (
521+ command ,
522+ print_command = self .log_file ,
523+ pty = True ,
524+ )
525+
526+ def wait_transmission_done (self ) -> None :
527+ pktgen_cmd = self .pktgen_cmd
528+
529+ if pktgen_cmd is None :
530+ # Pktgen is not running.
531+ return
532+
533+ pktgen_cmd .watch (
534+ stdout = self .log_file ,
535+ stderr = self .log_file ,
536+ keyboard_int = lambda : pktgen_cmd .send (b"\x03 " ),
537+ )
538+ status = pktgen_cmd .recv_exit_status ()
539+ if status != 0 :
540+ raise RuntimeError ("Error running EnsōGen" )
541+
542+ self .update_stats ()
543+
544+ self .pktgen_cmd = None
545+
546+ def update_stats (self ) -> None :
547+ # Make sure transmission rate matches specification for sufficiently
548+ # high rates (i.e., >50Gbps).
549+ calculate_tx_mean = (
550+ self .check_tx_rate
551+ and (self .current_throughput > 50e9 )
552+ and (self .expected_tx_duration > 4 )
553+ )
554+
555+ # Retrieve the latest stats.
556+ with tempfile .TemporaryDirectory () as tmp :
557+ local_stats = f"{ tmp } /stats.csv"
558+ download_file (
559+ self .nic .host_name , self .stats_file , local_stats , self .log_file
560+ )
561+ parsed_stats = EnsoGenStats (local_stats )
562+
563+ stats_summary = parsed_stats .get_summary (calculate_tx_mean )
564+
565+ # Check TX rate.
566+ if calculate_tx_mean :
567+ tx_goodput_mbps = stats_summary ["tx_mean_goodput_mbps" ]
568+ tx_goodput_gbps = tx_goodput_mbps / 1e3
569+ expected_goodput_gbps = self .current_goodput / 1e9
570+
571+ if abs (tx_goodput_gbps - expected_goodput_gbps ) > 1.0 :
572+ raise RuntimeError (
573+ f"TX goodput ({ tx_goodput_gbps } Gbps) does not match "
574+ f"specification ({ expected_goodput_gbps } Gbps)"
575+ )
576+
577+ self .mean_rx_goodput = stats_summary .get ("rx_mean_goodput_mbps" , 0 )
578+ self .mean_rx_goodput *= 1_000_000
579+ self .mean_tx_goodput = stats_summary .get ("tx_mean_goodput_mbps" , 0 )
580+ self .mean_tx_goodput *= 1_000_000
581+
582+ self .mean_rx_rate = stats_summary .get ("rx_mean_rate_kpps" , 0 ) * 1000
583+ self .mean_tx_rate = stats_summary .get ("tx_mean_rate_kpps" , 0 ) * 1000
584+
585+ self .nb_rx_pkts += stats_summary .get ("rx_packets" , 0 )
586+ self .nb_tx_pkts += stats_summary .get ("tx_packets" , 0 )
587+
588+ self .nb_rx_bytes += stats_summary .get ("rx_bytes" , 0 )
589+ self .nb_tx_bytes += stats_summary .get ("tx_bytes" , 0 )
590+
591+ self .mean_rtt = stats_summary .get ("mean_rtt_ns" , 0 )
592+
593+ @property
594+ def pcap_path (self ) -> None :
595+ return self ._pcap_path
596+
597+ @pcap_path .setter
598+ def pcap_path (self , pcap_path ) -> None :
599+ self .mean_pcap_pkt_size = self .get_pcap_mean_pkt_size (pcap_path )
600+ self ._pcap_path = pcap_path
601+
602+ @property
603+ def queues (self ) -> int :
604+ return self ._queues
605+
606+ @queues .setter
607+ def queues (self , queues ) -> None :
608+ self ._queues = queues
609+
610+ def get_nb_rx_pkts (self ) -> int :
611+ return self .nb_rx_pkts
612+
613+ def get_nb_tx_pkts (self ) -> int :
614+ return self .nb_tx_pkts
615+
616+ def get_rx_throughput (self ) -> int :
617+ return self .mean_rx_goodput + self .mean_rx_rate * 24 * 8
618+
619+ def get_tx_throughput (self ) -> int :
620+ return self .mean_tx_goodput + self .mean_tx_rate * 24 * 8
621+
622+ def clean_stats (self ) -> None :
623+ self .nb_rx_pkts = 0
624+ self .nb_rx_bytes = 0
625+ self .mean_rx_goodput = 0
626+ self .mean_rx_rate = 0
627+ self .nb_tx_pkts = 0
628+ self .mean_tx_goodput = 0
629+ self .mean_tx_rate = 0
630+ self .nb_tx_bytes = 0
631+ self .mean_rtt = 0
632+
633+ def stop (self ) -> None :
634+ if self .pktgen_cmd is None :
635+ # Pktgen is not running.
636+ return
637+
638+ if self .pktgen_cmd .exit_status_ready ():
639+ # Pktgen has already exited.
640+ self .pktgen_cmd = None
641+ return
642+
643+ self .pktgen_cmd .send (b"\x03 " )
644+
645+ self .pktgen_cmd .watch (stdout = self .log_file , stderr = self .log_file )
646+ status = self .pktgen_cmd .recv_exit_status ()
647+ if status != 0 :
648+ raise RuntimeError ("Error stopping EnsōGen" )
649+
650+ self .update_stats ()
651+
652+ self .pktgen_cmd = None
653+
654+ def close (self ) -> None :
655+ # No need to close here.
656+ pass
657+
658+ def get_pcap_mean_pkt_size (self , pcap_path : str ) -> float :
659+ cmd_str = f"{ self .nic .enso_path } /{ GET_PCAP_SIZE_CMD } { pcap_path } "
660+ cmd = self .nic .host .run_command (cmd_str , pty = True )
661+ output = cmd .watch (keyboard_int = lambda : cmd .send ("\x03 " ))
662+ status = cmd .recv_exit_status ()
663+
664+ if status != 0 :
665+ raise RuntimeError (f'Error processing pcap: "{ output } "' )
666+
667+ mean_pcap_pkt_size = float (output )
668+ return mean_pcap_pkt_size
0 commit comments