From 75cc11f791d081c0c4bd235415163d7af513b0cb Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Thu, 17 Aug 2023 17:20:06 +0900 Subject: [PATCH 1/3] ct sync optimizations --- loxinet/dpbroker.go | 31 ++++++++++++++++++++++- loxinet/dpebpf_linux.go | 55 ++++++++++++++++++++++++++++++----------- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/loxinet/dpbroker.go b/loxinet/dpbroker.go index 494a657a1..835996729 100644 --- a/loxinet/dpbroker.go +++ b/loxinet/dpbroker.go @@ -545,7 +545,11 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int { rpcCallStr = "XSync.DpWorkOnCtAdd" } } else if op == DpSyncDelete { - rpcCallStr = "XSync.DpWorkOnCtDelete" + if len(blkCti) > 0 { + rpcCallStr = "XSync.DpWorkOnBlockCtDelete" + } else { + rpcCallStr = "XSync.DpWorkOnCtDelete" + } } else if op == DpSyncGet { rpcCallStr = "XSync.DpWorkOnCtGet" } else { @@ -632,6 +636,7 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { *ret = 0 + tk.LogIt(tk.LogDebug, "RPC - Block Add CT Start Len %d\n", len(blockCtis)) mh.dp.DpHooks.DpGetLock() for _, cti := range blockCtis { @@ -682,6 +687,30 @@ func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error { return nil } +// DpWorkOnBlockCtDelete - Add block CT entries from remote +func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { + if !mh.ready { + return errors.New("Not-Ready") + } + + *ret = 0 + + mh.dp.DpHooks.DpGetLock() + + for _, cti := range blockCtis { + + tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key()) + r := mh.dp.DpHooks.DpCtDel(&cti) + if r != 0 { + *ret = r + } + } + + mh.dp.DpHooks.DpRelLock() + + return nil +} + // DpWorkOnCtDelete - Delete a CT entry from remote func (xs *XSync) DpWorkOnCtDelete(cti DpCtInfo, ret *int) error { if !mh.ready { diff --git a/loxinet/dpebpf_linux.go b/loxinet/dpebpf_linux.go index 057fc0102..e4c67bc36 100644 --- a/loxinet/dpebpf_linux.go +++ b/loxinet/dpebpf_linux.go @@ -299,12 +299,16 @@ func (e *DpEbpfH) DpEbpfUnInit() { e.tDone <- true e.ToFinCh <- 1 + tk.LogIt(tk.LogInfo, "ebpf uninit \n") + // Make sure to unload eBPF programs ifList, err := net.Interfaces() if err != nil { return } + tk.LogIt(tk.LogInfo, "ebpf uninit begin\n") + for _, intf := range ifList { tk.LogIt(tk.LogInfo, "ebpf unload - %s\n", intf.Name) @@ -1651,16 +1655,16 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { cti.XSync = true cti.NTs = time.Now() // Immediately notify for delete - ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti) - if ret == 0 { - delete(mh.dpEbpf.ctMap, cti.Key()) - // This is a strange fix - Sometimes loxilb runs as multiple docker - // instances in the same host. So, the map tracing infra can send notifications - // generated by other instances here. Depending on the timing, it is possible - // that the original deleter gets notified after it is handled in the remote - // instance. This is to handle such special cases. - C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0])) - } + //ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti) + //if ret == 0 { + // delete(mh.dpEbpf.ctMap, cti.Key()) + // This is a strange fix - Sometimes loxilb runs as multiple docker + // instances in the same host. So, the map tracing infra can send notifications + // generated by other instances here. Depending on the timing, it is possible + // that the original deleter gets notified after it is handled in the remote + // instance. This is to handle such special cases. + // C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0])) + //} } else { cte := mh.dpEbpf.ctMap[cti.Key()] if cte != nil { @@ -1715,10 +1719,13 @@ func dpCTMapChkUpdates() { var tact C.struct_dp_ct_tact var act *C.struct_dp_ct_dat var blkCti []DpCtInfo + var blkDelCti []DpCtInfo tc := time.Now() fd := C.llb_map2fd(C.LL_DP_CT_MAP) + tk.LogIt(tk.LogInfo, "[CT] Map size %d\n", len(mh.dpEbpf.ctMap)) + for _, cti := range mh.dpEbpf.ctMap { // tk.LogIt(tk.LogDebug, "[CT] check %s:%s:%v\n", cti.Key(), cti.CState, cti.XSync) if cti.CState != "est" { @@ -1730,7 +1737,7 @@ func dpCTMapChkUpdates() { act = &tact.ctd goCtEnt := new(DpCtInfo) goCtEnt.convDPCt2GoObj((*C.struct_dp_ct_key)(unsafe.Pointer(&cti.PKey[0])), act) - goCtEnt.LTs = time.Now() + goCtEnt.LTs = tc if goCtEnt.CState != cti.CState || goCtEnt.CAct != cti.CState { @@ -1750,7 +1757,7 @@ func dpCTMapChkUpdates() { tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", "update", ctStr) if goCtEnt.CState == "est" { goCtEnt.XSync = true - goCtEnt.NTs = time.Now() + goCtEnt.NTs = tc } continue } @@ -1799,7 +1806,8 @@ func dpCTMapChkUpdates() { ret := 0 if cti.Deleted > 0 { - ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti) + //ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti) + blkDelCti = append(blkDelCti, *cti) cti.Deleted++ } else { blkCti = append(blkCti, *cti) @@ -1815,12 +1823,29 @@ func dpCTMapChkUpdates() { } } } + + if len(blkCti) > 1024 { + tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") + mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) + blkCti = nil + } + + if len(blkDelCti) > 1024 { + tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n") + mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti) + blkDelCti = nil + } } if len(blkCti) > 0 { - tk.LogIt(tk.LogDebug, "[CT] Block Sync - \n") + tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) } + + if len(blkDelCti) > 0 { + tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n") + mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti) + } } // dpMapNotifierWorker - Work on any map notifications @@ -1921,7 +1946,7 @@ func (e *DpEbpfH) DpCtDel(w *DpCtInfo) int { cti := mh.dpEbpf.ctMap[mapKey] if cti == nil { tk.LogIt(tk.LogError, "ctInfo-key (%v) not present\n", mapKey) - return EbpfErrCtDel + return 0 } delete(mh.dpEbpf.ctMap, mapKey) From c7442973cc5b1319854be540eb501f70dfdc9542 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Fri, 18 Aug 2023 13:49:34 +0900 Subject: [PATCH 2/3] ct sync optimizations --- loxilb-ebpf | 2 +- loxinet/dpbroker.go | 11 ----------- loxinet/dpebpf_linux.go | 18 +++++++++++++----- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/loxilb-ebpf b/loxilb-ebpf index ea174aca9..82bc922d7 160000 --- a/loxilb-ebpf +++ b/loxilb-ebpf @@ -1 +1 @@ -Subproject commit ea174aca9b419252fb6461b2fe423c98e32a4246 +Subproject commit 82bc922d7f27921b3bc76b4aded103abfab043dd diff --git a/loxinet/dpbroker.go b/loxinet/dpbroker.go index 835996729..7a46170b2 100644 --- a/loxinet/dpbroker.go +++ b/loxinet/dpbroker.go @@ -635,10 +635,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { } *ret = 0 - - tk.LogIt(tk.LogDebug, "RPC - Block Add CT Start Len %d\n", len(blockCtis)) - mh.dp.DpHooks.DpGetLock() - for _, cti := range blockCtis { tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key()) @@ -648,8 +644,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error { } } - mh.dp.DpHooks.DpRelLock() - return nil } @@ -694,9 +688,6 @@ func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { } *ret = 0 - - mh.dp.DpHooks.DpGetLock() - for _, cti := range blockCtis { tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key()) @@ -706,8 +697,6 @@ func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error { } } - mh.dp.DpHooks.DpRelLock() - return nil } diff --git a/loxinet/dpebpf_linux.go b/loxinet/dpebpf_linux.go index e4c67bc36..0fceb0806 100644 --- a/loxinet/dpebpf_linux.go +++ b/loxinet/dpebpf_linux.go @@ -133,6 +133,7 @@ type DpEbpfH struct { ticker *time.Ticker tDone chan bool ctBcast chan bool + nID uint tbN uint CtSync bool RssEn bool @@ -280,12 +281,13 @@ func DpEbpfInit(clusterEn bool, nodeNum int, rssEn bool, egrHooks bool, logLevel ne := new(DpEbpfH) ne.tDone = make(chan bool) - ne.ToMapCh = make(chan interface{}, DpWorkQLen) + ne.ToMapCh = make(chan interface{}, 65536) ne.ToFinCh = make(chan int) ne.ctBcast = make(chan bool) ne.ticker = time.NewTicker(DpEbpfLinuxTiVal * time.Second) ne.ctMap = make(map[string]*DpCtInfo) ne.RssEn = rssEn + ne.nID = uint((C.LLB_CT_MAP_ENTRIES / C.LLB_MAX_LB_NODES) * nodeNum) go dpEbpfTicker() go dpMapNotifierWorker(ne.ToFinCh, ne.ToMapCh) @@ -1608,6 +1610,9 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { if len(cti.PVal) != 0 { tact = (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) act = &tact.ctd + if (uint)(act.nid) != mh.dpEbpf.nID { + return + } addOp = true opStr = "Add" } else { @@ -1648,7 +1653,7 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { if addOp == false { cti = mh.dpEbpf.ctMap[mapKey] - if cti == nil { + if cti == nil || cti.Deleted > 0 { return } cti.Deleted = 1 @@ -1680,7 +1685,7 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) { } } - tk.LogIt(tk.LogInfo, "[CT] %s - %s\n", opStr, cti.String()) + tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", opStr, cti.String()) } func dpCTMapBcast() { @@ -1724,7 +1729,9 @@ func dpCTMapChkUpdates() { tc := time.Now() fd := C.llb_map2fd(C.LL_DP_CT_MAP) - tk.LogIt(tk.LogInfo, "[CT] Map size %d\n", len(mh.dpEbpf.ctMap)) + if len(mh.dpEbpf.ctMap) > 0 { + tk.LogIt(tk.LogDebug, "[CT] Map size %d\n", len(mh.dpEbpf.ctMap)) + } for _, cti := range mh.dpEbpf.ctMap { // tk.LogIt(tk.LogDebug, "[CT] check %s:%s:%v\n", cti.Key(), cti.CState, cti.XSync) @@ -1895,7 +1902,8 @@ func (e *DpEbpfH) DpCtAdd(w *DpCtInfo) int { // Fix few things ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&w.PVal[0])) - ptact.ctd.rid = C.uint(r.ruleNum) // Race-condition here + ptact.ctd.rid = C.ushort(r.ruleNum) // Race-condition here + ptact.ctd.nid = C.uint(mh.dpEbpf.nID) ptact.lts = C.get_os_nsecs() mh.dpEbpf.mtx.Lock() From 35cc625016e5dc13f7ac261604991595b74b5e69 Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Fri, 18 Aug 2023 15:28:41 +0900 Subject: [PATCH 3/3] ct sync optimizations --- loxinet/dpebpf_linux.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/loxinet/dpebpf_linux.go b/loxinet/dpebpf_linux.go index 0fceb0806..fe18fec2c 100644 --- a/loxinet/dpebpf_linux.go +++ b/loxinet/dpebpf_linux.go @@ -1790,19 +1790,21 @@ func dpCTMapChkUpdates() { } if C.bpf_map_lookup_elem(C.int(fd), unsafe.Pointer(&cti.PKey[0]), unsafe.Pointer(&tact)) != 0 { tk.LogIt(tk.LogInfo, "[CT] ent not found %s\n", cti.Key()) - delete(mh.dpEbpf.ctMap, cti.Key()) - continue - } - ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) - ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0), - (unsafe.Pointer(&b)), unsafe.Pointer(&p)) - if ret == 0 { - if cti.Packets != p+uint64(tact.ctd.pb.packets) { - cti.Bytes = b + uint64(tact.ctd.pb.bytes) - cti.Packets = p + uint64(tact.ctd.pb.packets) - cti.XSync = true - cti.NTs = tc - cti.LTs = tc + //delete(mh.dpEbpf.ctMap, cti.Key()) + cti.Deleted++ + cti.XSync = true + } else { + ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0])) + ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0), + (unsafe.Pointer(&b)), unsafe.Pointer(&p)) + if ret == 0 { + if cti.Packets != p+uint64(tact.ctd.pb.packets) { + cti.Bytes = b + uint64(tact.ctd.pb.bytes) + cti.Packets = p + uint64(tact.ctd.pb.packets) + cti.XSync = true + cti.NTs = tc + cti.LTs = tc + } } } } @@ -1833,7 +1835,10 @@ func dpCTMapChkUpdates() { if len(blkCti) > 1024 { tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") + tc1 := time.Now() mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) + tc2 := time.Now() + tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1))) blkCti = nil } @@ -1845,8 +1850,11 @@ func dpCTMapChkUpdates() { } if len(blkCti) > 0 { + tc1 := time.Now() tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n") mh.dp.DpXsyncRPC(DpSyncAdd, blkCti) + tc2 := time.Now() + tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1))) } if len(blkDelCti) > 0 {