1515#
1616*******************************************************************************/
1717#include < fstream>
18+ #include < algorithm>
1819#include < json/json.h>
1920
2021#ifdef ACCL_NETWORK_UTILS_MPI
@@ -191,9 +192,9 @@ void configure_vnx(vnx::CMAC &cmac, vnx::Networklayer &network_layer,
191192 }
192193}
193194
194- void configure_tcp (FPGABuffer <int8_t > &tx_buf_network, FPGABuffer <int8_t > &rx_buf_network,
195+ void configure_tcp (XRTBuffer <int8_t > &tx_buf_network, XRTBuffer <int8_t > &rx_buf_network,
195196 xrt::kernel &network_krnl, xrt::kernel &session_krnl,
196- const std::vector<rank_t > &ranks, int local_rank) {
197+ std::vector<rank_t > &ranks, int local_rank) {
197198 tx_buf_network.sync_to_device ();
198199 rx_buf_network.sync_to_device ();
199200
@@ -211,27 +212,185 @@ void configure_tcp(FPGABuffer<int8_t> &tx_buf_network, FPGABuffer<int8_t> &rx_bu
211212 << std::dec << std::endl;
212213 log_debug (ss.str ());
213214
214- // set up sessions for ranks
215- for (size_t i = 0 ; i < ranks.size (); ++i){
216- bool success;
217- if (i == static_cast <size_t >(local_rank)) {
218- continue ;
215+ MPI_Barrier (MPI_COMM_WORLD);
216+
217+ // Set up ports for each [other] rank on each rank
218+ for (int i = 0 ; i < ranks.size (); i++) {
219+ uint8_t tmp_ret_code = 0 ;
220+ uint16_t tmp_session_id = static_cast <uint16_t >(ranks[i].session_id );
221+ xrt::run run = session_krnl (
222+ static_cast <uint32_t >(ip_encode (ranks[i].ip )),
223+ static_cast <uint16_t >(ranks[i].port ),
224+ &tmp_session_id,
225+ &tmp_ret_code,
226+ tcpSessionHandlerOperation::OPEN_PORT
227+ );
228+ run.wait ();
229+ uint8_t ret_code = session_krnl.read_register (0x30 );
230+ if (!ret_code){
231+ throw std::runtime_error (
232+ " Failed to open port: " + std::to_string (ranks[i].port ) +
233+ " from local rank: " + std::to_string (local_rank)
234+ );
235+ } else {
236+ std::cout << " Successfully opened port: " << std::to_string (ranks[i].port ) <<
237+ " from local rank: " << std::to_string (local_rank) << std::endl;
219238 }
220- session_krnl (ranks[i].ip , ranks[i].port , false ,
221- &(ranks[i].session_id ), &success);
222- if (!success){
223- throw std::runtime_error (" Failed to establish session for IP:" +
224- ranks[i].ip +
225- " port: " +
226- std::to_string (ranks[i].port ));
239+ }
240+
241+ MPI_Barrier (MPI_COMM_WORLD);
242+
243+ // Open TCP connections
244+ for (int i = 0 ; i < ranks.size (); i++) {
245+ if (i == local_rank) continue ;
246+ uint8_t tmp_ret_code = 0 ;
247+ uint16_t tmp_session_id = static_cast <uint16_t >(ranks[i].session_id );
248+ xrt::run run = session_krnl (
249+ static_cast <uint32_t >(ip_encode (ranks[i].ip )),
250+ static_cast <uint16_t >(ranks[i].port ),
251+ &tmp_session_id,
252+ &tmp_ret_code,
253+ tcpSessionHandlerOperation::OPEN_CONNECTION
254+ );
255+ run.wait ();
256+ uint8_t ret_code = session_krnl.read_register (0x30 );
257+ uint8_t updated_sesion = session_krnl.read_register (0x28 );
258+ if (!ret_code){
259+ throw std::runtime_error (
260+ " Failed to establish session for IP: " + ranks[i].ip +
261+ " port: " + std::to_string (ranks[i].port ) +
262+ " from local rank: " + std::to_string (local_rank)
263+ );
264+ } else {
265+ std::cout << " Successfully opened session: " << updated_sesion <<
266+ " with IP address: " << std::to_string (ranks[i].port ) <<
267+ " from local rank: " << std::to_string (local_rank) << std::endl;
227268 }
228- std::ostringstream ss;
229- ss << " Established session ID: " << ranks[i].session_id << std::endl;
230- log_debug (ss.str ());
231269 }
270+ }
271+
272+ void exchange_qp (unsigned int master_rank, unsigned int slave_rank, unsigned int local_rank, std::vector<fpga::ibvQpConn*> &ibvQpConn_vec, std::vector<ACCL::rank_t > &ranks){
273+
274+ if (local_rank == master_rank)
275+ {
276+ std::cout<<" Local rank " <<local_rank<<" sending local QP to remote rank " <<slave_rank<<std::endl;
277+ // Send the local queue pair information to the slave rank
278+ MPI_Send (&(ibvQpConn_vec[slave_rank]->getQpairStruct ()->local ), sizeof (fpga::ibvQ), MPI_CHAR, slave_rank, 0 , MPI_COMM_WORLD);
279+ }
280+ else if (local_rank == slave_rank)
281+ {
282+ std::cout<<" Local rank " <<local_rank<<" receiving remote QP from remote rank " <<master_rank<<std::endl;
283+ // Receive the queue pair information from the master rank
284+ fpga::ibvQ received_q;
285+ MPI_Recv (&received_q, sizeof (fpga::ibvQ), MPI_CHAR, master_rank, 0 , MPI_COMM_WORLD, MPI_STATUS_IGNORE);
286+
287+ // Copy the received data to the remote queue pair
288+ ibvQpConn_vec[master_rank]->getQpairStruct ()->remote = received_q;
289+ }
290+
291+ // Synchronize after the first exchange to avoid race conditions
292+ MPI_Barrier (MPI_COMM_WORLD);
293+
294+ if (local_rank == slave_rank)
295+ {
296+ std::cout<<" Local rank " <<local_rank<<" sending local QP to remote rank " <<master_rank<<std::endl;
297+ // Send the local queue pair information to the master rank
298+ MPI_Send (&(ibvQpConn_vec[master_rank]->getQpairStruct ()->local ), sizeof (fpga::ibvQ), MPI_CHAR, master_rank, 0 , MPI_COMM_WORLD);
299+ }
300+ else if (local_rank == master_rank)
301+ {
302+ std::cout<<" Local rank " <<local_rank<<" receiving remote QP from remote rank " <<slave_rank<<std::endl;
303+ // Receive the queue pair information from the slave rank
304+ fpga::ibvQ received_q;
305+ MPI_Recv (&received_q, sizeof (fpga::ibvQ), MPI_CHAR, slave_rank, 0 , MPI_COMM_WORLD, MPI_STATUS_IGNORE);
306+
307+ // Copy the received data to the remote queue pair
308+ ibvQpConn_vec[slave_rank]->getQpairStruct ()->remote = received_q;
309+ }
310+
311+ MPI_Barrier (MPI_COMM_WORLD);
312+
313+ // write established connection to hardware and perform arp lookup
314+ if (local_rank == master_rank)
315+ {
316+ int connection = (ibvQpConn_vec[slave_rank]->getQpairStruct ()->local .qpn & 0xFFFF ) | ((ibvQpConn_vec[slave_rank]->getQpairStruct ()->remote .qpn & 0xFFFF ) << 16 );
317+ ibvQpConn_vec[slave_rank]->getQpairStruct ()->print ();
318+ ibvQpConn_vec[slave_rank]->setConnection (connection);
319+ ibvQpConn_vec[slave_rank]->writeContext (ranks[slave_rank].port );
320+ ibvQpConn_vec[slave_rank]->doArpLookup ();
321+ ranks[slave_rank].session_id = ibvQpConn_vec[slave_rank]->getQpairStruct ()->local .qpn ;
322+ } else if (local_rank == slave_rank)
323+ {
324+ int connection = (ibvQpConn_vec[master_rank]->getQpairStruct ()->local .qpn & 0xFFFF ) | ((ibvQpConn_vec[master_rank]->getQpairStruct ()->remote .qpn & 0xFFFF ) << 16 );
325+ ibvQpConn_vec[master_rank]->getQpairStruct ()->print ();
326+ ibvQpConn_vec[master_rank]->setConnection (connection);
327+ ibvQpConn_vec[master_rank]->writeContext (ranks[master_rank].port );
328+ ibvQpConn_vec[master_rank]->doArpLookup ();
329+ ranks[master_rank].session_id = ibvQpConn_vec[master_rank]->getQpairStruct ()->local .qpn ;
330+ }
331+
332+ MPI_Barrier (MPI_COMM_WORLD);
333+ }
232334
335+ void configure_cyt_rdma (std::vector<ACCL::rank_t > &ranks, int local_rank, ACCL::CoyoteDevice* device){
336+
337+ std::cout<<" Initializing QP connections..." <<std::endl;
338+ // create queue pair connections
339+ std::vector<fpga::ibvQpConn*> ibvQpConn_vec;
340+ // create single page dummy memory space for each qp
341+ uint32_t n_pages = 1 ;
342+ for (int i=0 ; i<ranks.size (); i++)
343+ {
344+ fpga::ibvQpConn* qpConn = new fpga::ibvQpConn (device->coyote_qProc_vec [i], ranks[local_rank].ip , n_pages);
345+ ibvQpConn_vec.push_back (qpConn);
346+ // qpConn->getQpairStruct()->print();
347+ }
348+
349+ std::cout<<" Exchanging QP..." <<std::endl;
350+ for (int i=0 ; i<ranks.size (); i++)
351+ {
352+ for (int j=i+1 ; j<ranks.size ();j++)
353+ {
354+ exchange_qp (i, j, local_rank, ibvQpConn_vec, ranks);
355+ }
356+ }
233357}
234358
359+ void configure_cyt_tcp (std::vector<ACCL::rank_t > &ranks, int local_rank, ACCL::CoyoteDevice* device){
360+ std::cout<<" Configuring Coyote TCP..." <<std::endl;
361+ // arp lookup
362+ for (int i=0 ; i<ranks.size (); i++){
363+ if (local_rank != i){
364+ device->get_device ()->doArpLookup (ip_encode (ranks[i].ip ));
365+ }
366+ }
367+
368+ // open port
369+ for (int i=0 ; i<ranks.size (); i++)
370+ {
371+ uint32_t dstPort = ranks[i].port ;
372+ bool open_port_status = device->get_device ()->tcpOpenPort (dstPort);
373+ }
374+
375+ std::this_thread::sleep_for (10ms);
376+
377+ // open con
378+ for (int i=0 ; i<ranks.size (); i++)
379+ {
380+ uint32_t dstPort = ranks[i].port ;
381+ uint32_t dstIp = ip_encode (ranks[i].ip );
382+ uint32_t dstRank = i;
383+ uint32_t session = 0 ;
384+ if (local_rank != dstRank)
385+ {
386+ bool success = device->get_device ()->tcpOpenCon (dstIp, dstPort, &session);
387+ ranks[i].session_id = session;
388+ }
389+ }
390+
391+ }
392+
393+
235394std::vector<std::string> get_ips (fs::path config_file) {
236395 std::vector<std::string> ips{};
237396 Json::Value config;
@@ -290,15 +449,17 @@ std::vector<rank_t> generate_ranks(bool local, int local_rank, int world_size,
290449}
291450
292451std::unique_ptr<ACCL::ACCL>
293- initialize_accl (const std::vector<rank_t > &ranks, int local_rank,
452+ initialize_accl (std::vector<rank_t > &ranks, int local_rank,
294453 bool simulator, acclDesign design, xrt::device device,
295- fs::path xclbin, int nbufs, addr_t bufsize, addr_t segsize,
296- bool rsfec) {
454+ fs::path xclbin, unsigned int nbufs, unsigned int bufsize,
455+ unsigned int egrsize, bool rsfec) {
297456 std::size_t world_size = ranks.size ();
298457 std::unique_ptr<ACCL::ACCL> accl;
299458
300- if (segsize == 0 ) {
301- segsize = bufsize;
459+ if (egrsize == 0 ) {
460+ egrsize = bufsize;
461+ } else if (egrsize > bufsize){
462+ bufsize = egrsize;
302463 }
303464
304465 if (simulator) {
@@ -342,13 +503,13 @@ initialize_accl(const std::vector<rank_t> &ranks, int local_rank,
342503 // Tx and Rx buffers will not be cleaned up properly and leak memory.
343504 // They need to live at least as long as ACCL so for now this is the best
344505 // we can do without requiring the users to allocate the buffers manually.
345- auto tx_buf_network = new FPGABuffer <int8_t >(
506+ auto tx_buf_network = new XRTBuffer <int8_t >(
346507 64 * 1024 * 1024 , dataType::int8, device, networkmem);
347- auto rx_buf_network = new FPGABuffer <int8_t >(
508+ auto rx_buf_network = new XRTBuffer <int8_t >(
348509 64 * 1024 * 1024 , dataType::int8, device, networkmem);
349510 auto network_krnl =
350- xrt::kernel (device, xclbin_uuid, " network_krnl:{network_krnl_0 }" ,
351- xrt::kernel::cu_access_mode::exclusive);
511+ xrt::kernel (device, xclbin_uuid, " network_krnl:{poe_0 }" ,
512+ xrt::kernel::cu_access_mode::exclusive);
352513 auto session_krnl =
353514 xrt::kernel (device, xclbin_uuid, " tcp_session_handler:{session_handler_0}" ,
354515 xrt::kernel::cu_access_mode::exclusive);
@@ -358,7 +519,7 @@ initialize_accl(const std::vector<rank_t> &ranks, int local_rank,
358519
359520 accl = std::make_unique<ACCL::ACCL>(device, cclo_ip, hostctrl_ip, devicemem, rxbufmem);
360521 }
361- accl.get ()->initialize (ranks, local_rank, nbufs, bufsize, segsize );
522+ accl.get ()->initialize (ranks, local_rank, nbufs, bufsize, egrsize, std::min (nbufs*bufsize, ( unsigned int ) 4 * 1024 * 1024 ) );
362523 return accl;
363524}
364525} // namespace accl_network_utils
0 commit comments