Skip to content

Commit

Permalink
Merge pull request #375 from TrekkieCoder/main
Browse files Browse the repository at this point in the history
PR - ct sync optimizations
  • Loading branch information
UltraInstinct14 authored Aug 18, 2023
2 parents a81a590 + 35cc625 commit 3494a79
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 39 deletions.
2 changes: 1 addition & 1 deletion loxilb-ebpf
30 changes: 24 additions & 6 deletions loxinet/dpbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,11 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int {
rpcCallStr = "XSync.DpWorkOnCtAdd"
}
} else if op == DpSyncDelete {
rpcCallStr = "XSync.DpWorkOnCtDelete"
if len(blkCti) > 0 {
rpcCallStr = "XSync.DpWorkOnBlockCtDelete"
} else {
rpcCallStr = "XSync.DpWorkOnCtDelete"
}
} else if op == DpSyncGet {
rpcCallStr = "XSync.DpWorkOnCtGet"
} else {
Expand Down Expand Up @@ -631,9 +635,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error {
}

*ret = 0

mh.dp.DpHooks.DpGetLock()

for _, cti := range blockCtis {

tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key())
Expand All @@ -643,8 +644,6 @@ func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error {
}
}

mh.dp.DpHooks.DpRelLock()

return nil
}

Expand Down Expand Up @@ -682,6 +681,25 @@ func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error {
return nil
}

// DpWorkOnBlockCtDelete - Add block CT entries from remote
func (xs *XSync) DpWorkOnBlockCtDelete(blockCtis []DpCtInfo, ret *int) error {
if !mh.ready {
return errors.New("Not-Ready")
}

*ret = 0
for _, cti := range blockCtis {

tk.LogIt(tk.LogDebug, "RPC - Block CT Del %s\n", cti.Key())
r := mh.dp.DpHooks.DpCtDel(&cti)
if r != 0 {
*ret = r
}
}

return nil
}

// DpWorkOnCtDelete - Delete a CT entry from remote
func (xs *XSync) DpWorkOnCtDelete(cti DpCtInfo, ret *int) error {
if !mh.ready {
Expand Down
105 changes: 73 additions & 32 deletions loxinet/dpebpf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type DpEbpfH struct {
ticker *time.Ticker
tDone chan bool
ctBcast chan bool
nID uint
tbN uint
CtSync bool
RssEn bool
Expand Down Expand Up @@ -280,12 +281,13 @@ func DpEbpfInit(clusterEn bool, nodeNum int, rssEn bool, egrHooks bool, logLevel

ne := new(DpEbpfH)
ne.tDone = make(chan bool)
ne.ToMapCh = make(chan interface{}, DpWorkQLen)
ne.ToMapCh = make(chan interface{}, 65536)
ne.ToFinCh = make(chan int)
ne.ctBcast = make(chan bool)
ne.ticker = time.NewTicker(DpEbpfLinuxTiVal * time.Second)
ne.ctMap = make(map[string]*DpCtInfo)
ne.RssEn = rssEn
ne.nID = uint((C.LLB_CT_MAP_ENTRIES / C.LLB_MAX_LB_NODES) * nodeNum)

go dpEbpfTicker()
go dpMapNotifierWorker(ne.ToFinCh, ne.ToMapCh)
Expand All @@ -299,12 +301,16 @@ func (e *DpEbpfH) DpEbpfUnInit() {
e.tDone <- true
e.ToFinCh <- 1

tk.LogIt(tk.LogInfo, "ebpf uninit \n")

// Make sure to unload eBPF programs
ifList, err := net.Interfaces()
if err != nil {
return
}

tk.LogIt(tk.LogInfo, "ebpf uninit begin\n")

for _, intf := range ifList {

tk.LogIt(tk.LogInfo, "ebpf unload - %s\n", intf.Name)
Expand Down Expand Up @@ -1604,6 +1610,9 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) {
if len(cti.PVal) != 0 {
tact = (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0]))
act = &tact.ctd
if (uint)(act.nid) != mh.dpEbpf.nID {
return
}
addOp = true
opStr = "Add"
} else {
Expand Down Expand Up @@ -1644,23 +1653,23 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) {

if addOp == false {
cti = mh.dpEbpf.ctMap[mapKey]
if cti == nil {
if cti == nil || cti.Deleted > 0 {
return
}
cti.Deleted = 1
cti.XSync = true
cti.NTs = time.Now()
// Immediately notify for delete
ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti)
if ret == 0 {
delete(mh.dpEbpf.ctMap, cti.Key())
// This is a strange fix - Sometimes loxilb runs as multiple docker
// instances in the same host. So, the map tracing infra can send notifications
// generated by other instances here. Depending on the timing, it is possible
// that the original deleter gets notified after it is handled in the remote
// instance. This is to handle such special cases.
C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0]))
}
//ret := mh.dp.DpXsyncRPC(DpSyncDelete, cti)
//if ret == 0 {
// delete(mh.dpEbpf.ctMap, cti.Key())
// This is a strange fix - Sometimes loxilb runs as multiple docker
// instances in the same host. So, the map tracing infra can send notifications
// generated by other instances here. Depending on the timing, it is possible
// that the original deleter gets notified after it is handled in the remote
// instance. This is to handle such special cases.
// C.llb_del_map_elem(C.LL_DP_CT_MAP, unsafe.Pointer(&cti.PKey[0]))
//}
} else {
cte := mh.dpEbpf.ctMap[cti.Key()]
if cte != nil {
Expand All @@ -1676,7 +1685,7 @@ func dpCTMapNotifierWorker(cti *DpCtInfo) {
}
}

tk.LogIt(tk.LogInfo, "[CT] %s - %s\n", opStr, cti.String())
tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", opStr, cti.String())
}

func dpCTMapBcast() {
Expand Down Expand Up @@ -1715,10 +1724,15 @@ func dpCTMapChkUpdates() {
var tact C.struct_dp_ct_tact
var act *C.struct_dp_ct_dat
var blkCti []DpCtInfo
var blkDelCti []DpCtInfo

tc := time.Now()
fd := C.llb_map2fd(C.LL_DP_CT_MAP)

if len(mh.dpEbpf.ctMap) > 0 {
tk.LogIt(tk.LogDebug, "[CT] Map size %d\n", len(mh.dpEbpf.ctMap))
}

for _, cti := range mh.dpEbpf.ctMap {
// tk.LogIt(tk.LogDebug, "[CT] check %s:%s:%v\n", cti.Key(), cti.CState, cti.XSync)
if cti.CState != "est" {
Expand All @@ -1730,7 +1744,7 @@ func dpCTMapChkUpdates() {
act = &tact.ctd
goCtEnt := new(DpCtInfo)
goCtEnt.convDPCt2GoObj((*C.struct_dp_ct_key)(unsafe.Pointer(&cti.PKey[0])), act)
goCtEnt.LTs = time.Now()
goCtEnt.LTs = tc

if goCtEnt.CState != cti.CState ||
goCtEnt.CAct != cti.CState {
Expand All @@ -1750,7 +1764,7 @@ func dpCTMapChkUpdates() {
tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", "update", ctStr)
if goCtEnt.CState == "est" {
goCtEnt.XSync = true
goCtEnt.NTs = time.Now()
goCtEnt.NTs = tc
}
continue
}
Expand All @@ -1776,19 +1790,21 @@ func dpCTMapChkUpdates() {
}
if C.bpf_map_lookup_elem(C.int(fd), unsafe.Pointer(&cti.PKey[0]), unsafe.Pointer(&tact)) != 0 {
tk.LogIt(tk.LogInfo, "[CT] ent not found %s\n", cti.Key())
delete(mh.dpEbpf.ctMap, cti.Key())
continue
}
ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0]))
ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0),
(unsafe.Pointer(&b)), unsafe.Pointer(&p))
if ret == 0 {
if cti.Packets != p+uint64(tact.ctd.pb.packets) {
cti.Bytes = b + uint64(tact.ctd.pb.bytes)
cti.Packets = p + uint64(tact.ctd.pb.packets)
cti.XSync = true
cti.NTs = tc
cti.LTs = tc
//delete(mh.dpEbpf.ctMap, cti.Key())
cti.Deleted++
cti.XSync = true
} else {
ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&cti.PVal[0]))
ret := C.llb_fetch_map_stats_cached(C.int(C.LL_DP_CT_STATS_MAP), C.uint(ptact.ca.cidx), C.int(0),
(unsafe.Pointer(&b)), unsafe.Pointer(&p))
if ret == 0 {
if cti.Packets != p+uint64(tact.ctd.pb.packets) {
cti.Bytes = b + uint64(tact.ctd.pb.bytes)
cti.Packets = p + uint64(tact.ctd.pb.packets)
cti.XSync = true
cti.NTs = tc
cti.LTs = tc
}
}
}
}
Expand All @@ -1799,7 +1815,8 @@ func dpCTMapChkUpdates() {

ret := 0
if cti.Deleted > 0 {
ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti)
//ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti)
blkDelCti = append(blkDelCti, *cti)
cti.Deleted++
} else {
blkCti = append(blkCti, *cti)
Expand All @@ -1815,11 +1832,34 @@ func dpCTMapChkUpdates() {
}
}
}

if len(blkCti) > 1024 {
tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n")
tc1 := time.Now()
mh.dp.DpXsyncRPC(DpSyncAdd, blkCti)
tc2 := time.Now()
tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1)))
blkCti = nil
}

if len(blkDelCti) > 1024 {
tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n")
mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti)
blkDelCti = nil
}
}

if len(blkCti) > 0 {
tk.LogIt(tk.LogDebug, "[CT] Block Sync - \n")
tc1 := time.Now()
tk.LogIt(tk.LogDebug, "[CT] Block Add Sync - \n")
mh.dp.DpXsyncRPC(DpSyncAdd, blkCti)
tc2 := time.Now()
tk.LogIt(tk.LogInfo, "[CT] Block Add Sync %d took %v- \n", len(blkCti), time.Duration(tc2.Sub(tc1)))
}

if len(blkDelCti) > 0 {
tk.LogIt(tk.LogDebug, "[CT] Block Del Sync - \n")
mh.dp.DpXsyncRPC(DpSyncDelete, blkDelCti)
}
}

Expand Down Expand Up @@ -1870,7 +1910,8 @@ func (e *DpEbpfH) DpCtAdd(w *DpCtInfo) int {

// Fix few things
ptact := (*C.struct_dp_ct_tact)(unsafe.Pointer(&w.PVal[0]))
ptact.ctd.rid = C.uint(r.ruleNum) // Race-condition here
ptact.ctd.rid = C.ushort(r.ruleNum) // Race-condition here
ptact.ctd.nid = C.uint(mh.dpEbpf.nID)
ptact.lts = C.get_os_nsecs()

mh.dpEbpf.mtx.Lock()
Expand Down Expand Up @@ -1921,7 +1962,7 @@ func (e *DpEbpfH) DpCtDel(w *DpCtInfo) int {
cti := mh.dpEbpf.ctMap[mapKey]
if cti == nil {
tk.LogIt(tk.LogError, "ctInfo-key (%v) not present\n", mapKey)
return EbpfErrCtDel
return 0
}

delete(mh.dpEbpf.ctMap, mapKey)
Expand Down

0 comments on commit 3494a79

Please sign in to comment.