From 43f887f1a5c9153ed675477e0146426f8da01a47 Mon Sep 17 00:00:00 2001 From: Junxiao Shi Date: Mon, 1 Jul 2024 18:21:21 -0400 Subject: [PATCH] pdump: EthPortSource on non-isolated RxFlow --- app/pdump/README.md | 3 +- app/pdump/ethport.go | 10 ++----- app/pdump/gql.go | 5 ++-- cmd/ndndpdk-ctrl/README.md | 2 +- cmd/ndndpdk-ctrl/pdump.go | 4 +-- core/README.md | 1 + csrc/core/common.h | 5 +--- csrc/ethface/face.c | 27 +++++++++++++---- csrc/ethface/rxtable.c | 15 +++++----- csrc/ethface/rxtable.h | 1 - csrc/iface/rxloop.c | 6 +++- csrc/pdump/source.c | 2 ++ csrc/pdump/source.h | 60 ++++++++++++++++++++++++++++++++++++++ dpdk/ealinit/logging.go | 5 ++-- go.mod | 2 +- go.sum | 4 +-- iface/ethport/rxtable.go | 9 ------ 17 files changed, 111 insertions(+), 50 deletions(-) diff --git a/app/pdump/README.md b/app/pdump/README.md index 7746290e0..ffca70e0a 100644 --- a/app/pdump/README.md +++ b/app/pdump/README.md @@ -44,8 +44,7 @@ SLL is chosen instead of Ethernet because: **EthPortSource** type defines a packet dump source attached to an [Ethernet port](../../iface/ethport), at a specific grab opportunity. Currently, the only supported grab opportunity is *RxUnmatched*: it captures incoming packets on an Ethernet port that does not match any face. -It is referenced by **EthRxTable** table type in an RCU protected pointer. -Hence, this feature is only supported on Ethernet ports that use RxTable receive path. +This is supported on RxTable receive path, as well as RxFlow receive path when not in flow isolation mode. In the output file, each Ethernet port appears as a network interface. Packets are written as Ethernet link type, with the original Ethernet headers. diff --git a/app/pdump/ethport.go b/app/pdump/ethport.go index 9ab6d6b91..6c75daeca 100644 --- a/app/pdump/ethport.go +++ b/app/pdump/ethport.go @@ -2,7 +2,6 @@ package pdump /* #include "../../csrc/pdump/source.h" -#include "../../csrc/ethface/rxtable.h" */ import "C" import ( @@ -36,8 +35,6 @@ type EthPortConfig struct { Writer *Writer Port *ethport.Port Grab EthGrab - - rxt *C.EthRxTable } func (cfg *EthPortConfig) validate() error { @@ -49,8 +46,6 @@ func (cfg *EthPortConfig) validate() error { if cfg.Port == nil { errs = append(errs, errors.New("port not found")) - } else if cfg.rxt = (*C.EthRxTable)(ethport.RxTablePtrFromPort(cfg.Port)); cfg.rxt == nil { - errs = append(errs, errors.New("port is not using RxTable")) } if cfg.Grab != EthGrabRxUnmatched { @@ -68,7 +63,7 @@ type EthPortSource struct { } func (s *EthPortSource) setRef(expected, newPtr *C.PdumpSource) { - setSourceRef(&s.rxt.pdumpUnmatched, expected, newPtr) + setSourceRef(&C.gPdumpEthPortSources[s.Port.EthDev().ID()], expected, newPtr) } // Close detaches the dump source. @@ -92,7 +87,7 @@ func (s *EthPortSource) closeImpl() error { return nil } -// NewEthPortSource creates a EthPortSource. +// NewEthPortSource creates an EthPortSource. func NewEthPortSource(cfg EthPortConfig) (s *EthPortSource, e error) { if e := cfg.validate(); e != nil { return nil, e @@ -138,6 +133,7 @@ func NewEthPortSource(cfg EthPortConfig) (s *EthPortSource, e error) { } func init() { + // C PdumpWriter.intf field is indexed by both EthDevID and FaceID, so they must not overlap if ethdev.MaxEthDevs > iface.MinID { panic("FaceID and EthDevID must not overlap") } diff --git a/app/pdump/gql.go b/app/pdump/gql.go index f65ff2d70..65ce39b77 100644 --- a/app/pdump/gql.go +++ b/app/pdump/gql.go @@ -234,9 +234,8 @@ func init() { GetID: func(s *EthPortSource) string { return strconv.Itoa(s.Port.EthDev().ID()) }, - Retrieve: func(id string) *EthPortSource { - ethDev := ethdev.GqlEthDevType.Retrieve(id) - port := ethport.Find(ethDev) + RetrieveInt: func(id int) *EthPortSource { + port := ethport.Find(ethdev.FromID(id)) if port == nil { return nil } diff --git a/cmd/ndndpdk-ctrl/README.md b/cmd/ndndpdk-ctrl/README.md index feadf010e..0cd4f605d 100644 --- a/cmd/ndndpdk-ctrl/README.md +++ b/cmd/ndndpdk-ctrl/README.md @@ -35,4 +35,4 @@ This command only exposes a subset of API functionality to perform certain commo It is not intended to expose every feature or every output field that is possible with the API. If you want additional functionality or more output fields, you should prepare and send GraphQL queries directly. -There are many GraphQL tools such as [graphqurl](https://www.npmjs.com/package/graphqurl) and [Altair GraphQL Client](https://altair.sirmuel.design/) that may be helpful. +There are many GraphQL tools such as [graphqurl](https://www.npmjs.com/package/graphqurl) and [Altair GraphQL Client](https://altairgraphql.dev) that may be helpful. diff --git a/cmd/ndndpdk-ctrl/pdump.go b/cmd/ndndpdk-ctrl/pdump.go index 81e1edf45..1eaac0385 100644 --- a/cmd/ndndpdk-ctrl/pdump.go +++ b/cmd/ndndpdk-ctrl/pdump.go @@ -115,8 +115,8 @@ func init() { } } closeAll := func(c *cli.Context) { - for _, faceSource := range sources { - runDeleteCommand(c, faceSource) + for _, source := range sources { + runDeleteCommand(c, source) } if writer != "" { runDeleteCommand(c, writer) diff --git a/core/README.md b/core/README.md index 96db5c33b..b11613fff 100644 --- a/core/README.md +++ b/core/README.md @@ -16,6 +16,7 @@ Go shared code: * cptr: handle C `void*` pointers. * dlopen: load dynamic libraries. * events: simple event emitter. +* gqlclient: GraphQL client. * gqlserver: GraphQL server. * hwinfo: hardware information gathering. * jsonhelper: JSON encoding and decoding. diff --git a/csrc/core/common.h b/csrc/core/common.h index 072aff4f6..17142c5c0 100644 --- a/csrc/core/common.h +++ b/csrc/core/common.h @@ -75,10 +75,7 @@ #endif #ifdef NDNDPDK_POISON -#define POISON_2_(x, size) \ - do { \ - memset((x), 0x99, size); \ - } while (false) +#define POISON_2_(x, size) memset(x, 0x99, size) #else #define POISON_2_(x, size) \ do { \ diff --git a/csrc/ethface/face.c b/csrc/ethface/face.c index 18eb5f982..da8aaaabd 100644 --- a/csrc/ethface/face.c +++ b/csrc/ethface/face.c @@ -12,25 +12,40 @@ enum { }; __attribute__((nonnull)) static __rte_always_inline void -EthRxFlow_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx, bool skipCheck) { +EthRxFlow_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx, bool isolated) { EthRxFlow* rxf = container_of(rxg, EthRxFlow, base); ctx->nRx = rte_eth_rx_burst(rxf->port, rxf->queue, ctx->pkts, rxf->burstSize); uint64_t now = rte_get_tsc_cycles(); + PdumpEthPortUnmatchedCtx unmatch; + if (isolated) { + PdumpEthPortUnmatchedCtx_Disable(&unmatch); + } else { + // RCU lock is inherited from RxLoop_Run + PdumpEthPortUnmatchedCtx_Init(&unmatch, rxf->port); + } + for (uint16_t i = 0; i < ctx->nRx; ++i) { struct rte_mbuf* m = ctx->pkts[i]; - if (skipCheck || likely(EthRxMatch_Match(rxf->rxMatch, m))) { - Mbuf_SetTimestamp(m, now); + Mbuf_SetTimestamp(m, now); + if (isolated || likely(EthRxMatch_Match(rxf->rxMatch, m))) { m->port = rxf->faceID; rte_pktmbuf_adj(m, rxf->hdrLen); } else { RxGroupBurstCtx_Drop(ctx, i); + if (PdumpEthPortUnmatchedCtx_Append(&unmatch, m)) { + ctx->pkts[i] = NULL; + } } } + + if (!isolated) { + PdumpEthPortUnmatchedCtx_Process(&unmatch); + } } __attribute__((nonnull)) static void -EthRxFlow_RxBurst_Unchecked(RxGroup* rxg, RxGroupBurstCtx* ctx) { +EthRxFlow_RxBurst_Isolated(RxGroup* rxg, RxGroupBurstCtx* ctx) { EthRxFlow_RxBurst(rxg, ctx, true); } @@ -83,7 +98,7 @@ EthFace_SetupFlow(EthFacePriv* priv, const uint16_t queues[], int nQueues, const *rxf = (const EthRxFlow){ .base = { - .rxBurst = isolated ? EthRxFlow_RxBurst_Unchecked : EthRxFlow_RxBurst_Checked, + .rxBurst = isolated ? EthRxFlow_RxBurst_Isolated : EthRxFlow_RxBurst_Checked, .rxThread = i, }, .faceID = priv->faceID, @@ -100,7 +115,7 @@ EthFace_SetupFlow(EthFacePriv* priv, const uint16_t queues[], int nQueues, const __attribute__((nonnull)) void EthFace_SetupRxMemif(EthFacePriv* priv, const EthLocator* loc) { priv->rxf[0] = (const EthRxFlow){ - .base = {.rxBurst = EthRxFlow_RxBurst_Unchecked, .rxThread = 0}, + .base = {.rxBurst = EthRxFlow_RxBurst_Isolated, .rxThread = 0}, .faceID = priv->faceID, .port = priv->port, .queue = 0, diff --git a/csrc/ethface/rxtable.c b/csrc/ethface/rxtable.c index 1ddc0bab3..eb8960fff 100644 --- a/csrc/ethface/rxtable.c +++ b/csrc/ethface/rxtable.c @@ -19,20 +19,21 @@ EthRxTable_Accept(EthRxTable* rxt, struct rte_mbuf* m) { void EthRxTable_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx) { EthRxTable* rxt = container_of(rxg, EthRxTable, base); - PdumpSource* pdumpUnmatched = PdumpSourceRef_Get(&rxt->pdumpUnmatched); ctx->nRx = rte_eth_rx_burst(rxt->port, rxt->queue, ctx->pkts, RTE_DIM(ctx->pkts)); uint64_t now = rte_get_tsc_cycles(); - struct rte_mbuf* unmatch[MaxBurstSize]; + PdumpEthPortUnmatchedCtx unmatch; + // RCU lock is inherited from RxLoop_Run + PdumpEthPortUnmatchedCtx_Init(&unmatch, rxt->port); + struct rte_mbuf* bounceBufs[MaxBurstSize]; - uint16_t nUnmatch = 0, nBounceBufs = 0; + uint16_t nBounceBufs = 0; for (uint16_t i = 0; i < ctx->nRx; ++i) { struct rte_mbuf* m = ctx->pkts[i]; Mbuf_SetTimestamp(m, now); if (unlikely(!EthRxTable_Accept(rxt, m))) { RxGroupBurstCtx_Drop(ctx, i); - if (pdumpUnmatched != NULL) { - unmatch[nUnmatch++] = m; + if (PdumpEthPortUnmatchedCtx_Append(&unmatch, m)) { ctx->pkts[i] = NULL; } else if (rxt->copyTo != NULL) { // free bounce bufs locally instead of via RxLoop, because rte_pktmbuf_free_bulk is most @@ -55,9 +56,7 @@ EthRxTable_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx) { bounceBufs[nBounceBufs++] = m; } - if (unlikely(pdumpUnmatched != NULL && nUnmatch > 0)) { - PdumpSource_Process(pdumpUnmatched, unmatch, nUnmatch); - } + PdumpEthPortUnmatchedCtx_Process(&unmatch); if (unlikely(nBounceBufs > 0)) { rte_pktmbuf_free_bulk(bounceBufs, nBounceBufs); } diff --git a/csrc/ethface/rxtable.h b/csrc/ethface/rxtable.h index 33a98d362..5f227aacd 100644 --- a/csrc/ethface/rxtable.h +++ b/csrc/ethface/rxtable.h @@ -11,7 +11,6 @@ typedef struct EthRxTable { RxGroup base; struct cds_hlist_head head; struct rte_mempool* copyTo; - PdumpSourceRef pdumpUnmatched; uint16_t port; uint16_t queue; } EthRxTable; diff --git a/csrc/iface/rxloop.c b/csrc/iface/rxloop.c index 33982c051..d1ef70ccf 100644 --- a/csrc/iface/rxloop.c +++ b/csrc/iface/rxloop.c @@ -14,7 +14,11 @@ RxLoop_Transfer(RxLoop* rxl, RxGroup* rxg) { bool dropped = (ctx.dropBits[i >> 6] & (1 << (i & 0x3F))) != 0; if (unlikely(dropped)) { - drops[nDrops++] = pkt; + if (likely(pkt != NULL)) { + drops[nDrops++] = pkt; + } else { + // pkt was passed to pdump or freed as bounceBufs in EthRxTable_RxBurst + } continue; } diff --git a/csrc/pdump/source.c b/csrc/pdump/source.c index ecc4069ba..2f6d7139b 100644 --- a/csrc/pdump/source.c +++ b/csrc/pdump/source.c @@ -54,3 +54,5 @@ PdumpFaceSource_Filter(PdumpSource* s0, struct rte_mbuf* pkt) { return prob > 0 && // skip pcg32 computation when there's no name match prob >= pcg32_random_r(&s->rng); // '>=' because UINT32_MAX means always } + +PdumpSourceRef gPdumpEthPortSources[RTE_MAX_ETHPORTS]; diff --git a/csrc/pdump/source.h b/csrc/pdump/source.h index 860da8779..307bb9d16 100644 --- a/csrc/pdump/source.h +++ b/csrc/pdump/source.h @@ -72,7 +72,67 @@ typedef struct PdumpFaceSource { uint8_t nameV[PdumpMaxNames * NameMaxLength]; } PdumpFaceSource; +/** + * @brief Perform name filtering to deterine whether to capture a packet. + * @param pkt packet positioned at NDNLPv2 header. + * @returns whether to capture the packet. + */ __attribute__((nonnull)) bool PdumpFaceSource_Filter(PdumpSource* s, struct rte_mbuf* pkt); +extern PdumpSourceRef gPdumpEthPortSources[RTE_MAX_ETHPORTS]; + +/** @brief Packet dump for unmatched frames on an Ethernet port, contextual information. */ +typedef struct PdumpEthPortUnmatchedCtx { + PdumpSource* source; + uint16_t count; + struct rte_mbuf* pkts[MaxBurstSize]; +} PdumpEthPortUnmatchedCtx; + +/** @brief Initialize PdumpEthPortUnmatchedCtx to be disabled. */ +static __rte_always_inline void +PdumpEthPortUnmatchedCtx_Disable(PdumpEthPortUnmatchedCtx* ctx) { + ctx->source = NULL; + ctx->count = 0; + POISON(ctx->pkts); +} + +/** + * @brief Initialize PdumpEthPortUnmatchedCtx for an ethdev. + * @pre Calling thread holds rcu_read_lock. + */ +static __rte_always_inline void +PdumpEthPortUnmatchedCtx_Init(PdumpEthPortUnmatchedCtx* ctx, uint16_t port) { + PdumpSourceRef* ref = &gPdumpEthPortSources[port]; + ctx->source = PdumpSourceRef_Get(ref); + ctx->count = 0; + POISON(ctx->pkts); +} + +/** + * @brief Append an Ethernet frame to be captured. + * @param pkt Ethernet frame. + * @retval true packet is accepted and owned by pdump. + * @retval false packet is rejected and should be freed by caller. + */ +static __rte_always_inline bool +PdumpEthPortUnmatchedCtx_Append(PdumpEthPortUnmatchedCtx* ctx, struct rte_mbuf* pkt) { + if (ctx->source == NULL) { + return false; + } + ctx->pkts[ctx->count++] = pkt; + return true; +} + +/** @brief Submit accumulated packets to the pdump writer. */ +static __rte_always_inline void +PdumpEthPortUnmatchedCtx_Process(PdumpEthPortUnmatchedCtx* ctx) { + if (ctx->source == NULL || ctx->count == 0) { + return; + } + PdumpSource_Process(ctx->source, ctx->pkts, ctx->count); + NULLize(ctx->source); + POISON(ctx->pkts); +} + #endif // NDNDPDK_PDUMP_SOURCE_H diff --git a/dpdk/ealinit/logging.go b/dpdk/ealinit/logging.go index 2070d0e31..05ba670cc 100644 --- a/dpdk/ealinit/logging.go +++ b/dpdk/ealinit/logging.go @@ -7,7 +7,6 @@ import "C" import ( "bufio" "bytes" - "errors" "math" "regexp" "strconv" @@ -192,7 +191,7 @@ func processLogLine(line []byte) { if len(m[reLogLineError]) > 0 { e := string(m[reLogLineError][len(logErrorPrefix) : len(m[reLogLineError])-len(logErrorSuffix)]) if e == "-" { - fields = append(fields, zap.Error(errors.New(msg))) + fields = append(fields, zap.String("error", msg)) } else if em := reErrno.FindStringSubmatch(e); em != nil { errno, _ := strconv.ParseUint(em[reErrnoErrno], 10, 64) err := unix.Errno(errno) @@ -202,7 +201,7 @@ func processLogLine(line []byte) { zap.Error(err), ) } else { - fields = append(fields, zap.Error(errors.New(e))) + fields = append(fields, zap.String("error", e)) } } diff --git a/go.mod b/go.mod index 1a6c0222b..be646dab2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/functionalfoundry/graphqlws v0.0.0-20200611113535-7bc58903ce7b github.com/gogf/greuse v1.1.0 github.com/gopacket/gopacket v1.2.0 - github.com/gorilla/schema v1.4.0 + github.com/gorilla/schema v1.4.1 github.com/graphql-go/graphql v0.8.1 github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240626161320-231a28f62b43 github.com/jacobsa/fuse v0.0.0-20240626143436-8a36813dc074 diff --git a/go.sum b/go.sum index c9d0066bf..3aefcd054 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/gopacket/gopacket v1.2.0 h1:eXbzFad7f73P1n2EJHQlsKuvIMJjVXK5tXoSca78I github.com/gopacket/gopacket v1.2.0/go.mod h1:BrAKEy5EOGQ76LSqh7DMAr7z0NNPdczWm2GxCG7+I8M= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= -github.com/gorilla/schema v1.4.0 h1:l2N+lRTJtev9SUhBtj6NmSxd/6+8LhvN0kV+H2Y8R9k= -github.com/gorilla/schema v1.4.0/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM= +github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E= +github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/iface/ethport/rxtable.go b/iface/ethport/rxtable.go index 271d6b98b..f0771e8c7 100644 --- a/iface/ethport/rxtable.go +++ b/iface/ethport/rxtable.go @@ -98,12 +98,3 @@ func newRxgTable(port *Port) (rxt *rxgTable) { iface.ActivateRxGroup(rxt) return rxt } - -// RxTablePtrFromPort extracts *C.RxTable pointer from Port. -func RxTablePtrFromPort(port *Port) unsafe.Pointer { - impl, ok := port.rxImpl.(*rxTable) - if !ok { - return nil - } - return unsafe.Pointer(impl.rxt) -}