Skip to content

Commit

Permalink
pdump: EthPortSource on non-isolated RxFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jul 1, 2024
1 parent f6c6bce commit 43f887f
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 50 deletions.
3 changes: 1 addition & 2 deletions app/pdump/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ SLL is chosen instead of Ethernet because:

**EthPortSource** type defines a packet dump source attached to an [Ethernet port](../../iface/ethport), at a specific grab opportunity.
Currently, the only supported grab opportunity is *RxUnmatched*: it captures incoming packets on an Ethernet port that does not match any face.
It is referenced by **EthRxTable** table type in an RCU protected pointer.
Hence, this feature is only supported on Ethernet ports that use RxTable receive path.
This is supported on RxTable receive path, as well as RxFlow receive path when not in flow isolation mode.

In the output file, each Ethernet port appears as a network interface.
Packets are written as Ethernet link type, with the original Ethernet headers.
10 changes: 3 additions & 7 deletions app/pdump/ethport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pdump

/*
#include "../../csrc/pdump/source.h"
#include "../../csrc/ethface/rxtable.h"
*/
import "C"
import (
Expand Down Expand Up @@ -36,8 +35,6 @@ type EthPortConfig struct {
Writer *Writer
Port *ethport.Port
Grab EthGrab

rxt *C.EthRxTable
}

func (cfg *EthPortConfig) validate() error {
Expand All @@ -49,8 +46,6 @@ func (cfg *EthPortConfig) validate() error {

if cfg.Port == nil {
errs = append(errs, errors.New("port not found"))
} else if cfg.rxt = (*C.EthRxTable)(ethport.RxTablePtrFromPort(cfg.Port)); cfg.rxt == nil {
errs = append(errs, errors.New("port is not using RxTable"))
}

if cfg.Grab != EthGrabRxUnmatched {
Expand All @@ -68,7 +63,7 @@ type EthPortSource struct {
}

func (s *EthPortSource) setRef(expected, newPtr *C.PdumpSource) {
setSourceRef(&s.rxt.pdumpUnmatched, expected, newPtr)
setSourceRef(&C.gPdumpEthPortSources[s.Port.EthDev().ID()], expected, newPtr)
}

// Close detaches the dump source.
Expand All @@ -92,7 +87,7 @@ func (s *EthPortSource) closeImpl() error {
return nil
}

// NewEthPortSource creates a EthPortSource.
// NewEthPortSource creates an EthPortSource.
func NewEthPortSource(cfg EthPortConfig) (s *EthPortSource, e error) {
if e := cfg.validate(); e != nil {
return nil, e
Expand Down Expand Up @@ -138,6 +133,7 @@ func NewEthPortSource(cfg EthPortConfig) (s *EthPortSource, e error) {
}

func init() {
// C PdumpWriter.intf field is indexed by both EthDevID and FaceID, so they must not overlap
if ethdev.MaxEthDevs > iface.MinID {
panic("FaceID and EthDevID must not overlap")
}
Expand Down
5 changes: 2 additions & 3 deletions app/pdump/gql.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,8 @@ func init() {
GetID: func(s *EthPortSource) string {
return strconv.Itoa(s.Port.EthDev().ID())
},
Retrieve: func(id string) *EthPortSource {
ethDev := ethdev.GqlEthDevType.Retrieve(id)
port := ethport.Find(ethDev)
RetrieveInt: func(id int) *EthPortSource {
port := ethport.Find(ethdev.FromID(id))
if port == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ndndpdk-ctrl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ This command only exposes a subset of API functionality to perform certain commo
It is not intended to expose every feature or every output field that is possible with the API.

If you want additional functionality or more output fields, you should prepare and send GraphQL queries directly.
There are many GraphQL tools such as [graphqurl](https://www.npmjs.com/package/graphqurl) and [Altair GraphQL Client](https://altair.sirmuel.design/) that may be helpful.
There are many GraphQL tools such as [graphqurl](https://www.npmjs.com/package/graphqurl) and [Altair GraphQL Client](https://altairgraphql.dev) that may be helpful.
4 changes: 2 additions & 2 deletions cmd/ndndpdk-ctrl/pdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func init() {
}
}
closeAll := func(c *cli.Context) {
for _, faceSource := range sources {
runDeleteCommand(c, faceSource)
for _, source := range sources {
runDeleteCommand(c, source)
}
if writer != "" {
runDeleteCommand(c, writer)
Expand Down
1 change: 1 addition & 0 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Go shared code:
* cptr: handle C `void*` pointers.
* dlopen: load dynamic libraries.
* events: simple event emitter.
* gqlclient: GraphQL client.
* gqlserver: GraphQL server.
* hwinfo: hardware information gathering.
* jsonhelper: JSON encoding and decoding.
Expand Down
5 changes: 1 addition & 4 deletions csrc/core/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@
#endif

#ifdef NDNDPDK_POISON
#define POISON_2_(x, size) \
do { \
memset((x), 0x99, size); \
} while (false)
#define POISON_2_(x, size) memset(x, 0x99, size)
#else
#define POISON_2_(x, size) \
do { \
Expand Down
27 changes: 21 additions & 6 deletions csrc/ethface/face.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,40 @@ enum {
};

__attribute__((nonnull)) static __rte_always_inline void
EthRxFlow_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx, bool skipCheck) {
EthRxFlow_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx, bool isolated) {
EthRxFlow* rxf = container_of(rxg, EthRxFlow, base);
ctx->nRx = rte_eth_rx_burst(rxf->port, rxf->queue, ctx->pkts, rxf->burstSize);
uint64_t now = rte_get_tsc_cycles();

PdumpEthPortUnmatchedCtx unmatch;
if (isolated) {
PdumpEthPortUnmatchedCtx_Disable(&unmatch);
} else {
// RCU lock is inherited from RxLoop_Run
PdumpEthPortUnmatchedCtx_Init(&unmatch, rxf->port);
}

for (uint16_t i = 0; i < ctx->nRx; ++i) {
struct rte_mbuf* m = ctx->pkts[i];
if (skipCheck || likely(EthRxMatch_Match(rxf->rxMatch, m))) {
Mbuf_SetTimestamp(m, now);
Mbuf_SetTimestamp(m, now);
if (isolated || likely(EthRxMatch_Match(rxf->rxMatch, m))) {
m->port = rxf->faceID;
rte_pktmbuf_adj(m, rxf->hdrLen);
} else {
RxGroupBurstCtx_Drop(ctx, i);
if (PdumpEthPortUnmatchedCtx_Append(&unmatch, m)) {
ctx->pkts[i] = NULL;
}
}
}

if (!isolated) {
PdumpEthPortUnmatchedCtx_Process(&unmatch);
}
}

__attribute__((nonnull)) static void
EthRxFlow_RxBurst_Unchecked(RxGroup* rxg, RxGroupBurstCtx* ctx) {
EthRxFlow_RxBurst_Isolated(RxGroup* rxg, RxGroupBurstCtx* ctx) {
EthRxFlow_RxBurst(rxg, ctx, true);
}

Expand Down Expand Up @@ -83,7 +98,7 @@ EthFace_SetupFlow(EthFacePriv* priv, const uint16_t queues[], int nQueues, const
*rxf = (const EthRxFlow){
.base =
{
.rxBurst = isolated ? EthRxFlow_RxBurst_Unchecked : EthRxFlow_RxBurst_Checked,
.rxBurst = isolated ? EthRxFlow_RxBurst_Isolated : EthRxFlow_RxBurst_Checked,
.rxThread = i,
},
.faceID = priv->faceID,
Expand All @@ -100,7 +115,7 @@ EthFace_SetupFlow(EthFacePriv* priv, const uint16_t queues[], int nQueues, const
__attribute__((nonnull)) void
EthFace_SetupRxMemif(EthFacePriv* priv, const EthLocator* loc) {
priv->rxf[0] = (const EthRxFlow){
.base = {.rxBurst = EthRxFlow_RxBurst_Unchecked, .rxThread = 0},
.base = {.rxBurst = EthRxFlow_RxBurst_Isolated, .rxThread = 0},
.faceID = priv->faceID,
.port = priv->port,
.queue = 0,
Expand Down
15 changes: 7 additions & 8 deletions csrc/ethface/rxtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ EthRxTable_Accept(EthRxTable* rxt, struct rte_mbuf* m) {
void
EthRxTable_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx) {
EthRxTable* rxt = container_of(rxg, EthRxTable, base);
PdumpSource* pdumpUnmatched = PdumpSourceRef_Get(&rxt->pdumpUnmatched);
ctx->nRx = rte_eth_rx_burst(rxt->port, rxt->queue, ctx->pkts, RTE_DIM(ctx->pkts));
uint64_t now = rte_get_tsc_cycles();

struct rte_mbuf* unmatch[MaxBurstSize];
PdumpEthPortUnmatchedCtx unmatch;
// RCU lock is inherited from RxLoop_Run
PdumpEthPortUnmatchedCtx_Init(&unmatch, rxt->port);

struct rte_mbuf* bounceBufs[MaxBurstSize];
uint16_t nUnmatch = 0, nBounceBufs = 0;
uint16_t nBounceBufs = 0;
for (uint16_t i = 0; i < ctx->nRx; ++i) {
struct rte_mbuf* m = ctx->pkts[i];
Mbuf_SetTimestamp(m, now);
if (unlikely(!EthRxTable_Accept(rxt, m))) {
RxGroupBurstCtx_Drop(ctx, i);
if (pdumpUnmatched != NULL) {
unmatch[nUnmatch++] = m;
if (PdumpEthPortUnmatchedCtx_Append(&unmatch, m)) {
ctx->pkts[i] = NULL;
} else if (rxt->copyTo != NULL) {
// free bounce bufs locally instead of via RxLoop, because rte_pktmbuf_free_bulk is most
Expand All @@ -55,9 +56,7 @@ EthRxTable_RxBurst(RxGroup* rxg, RxGroupBurstCtx* ctx) {
bounceBufs[nBounceBufs++] = m;
}

if (unlikely(pdumpUnmatched != NULL && nUnmatch > 0)) {
PdumpSource_Process(pdumpUnmatched, unmatch, nUnmatch);
}
PdumpEthPortUnmatchedCtx_Process(&unmatch);
if (unlikely(nBounceBufs > 0)) {
rte_pktmbuf_free_bulk(bounceBufs, nBounceBufs);
}
Expand Down
1 change: 0 additions & 1 deletion csrc/ethface/rxtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ typedef struct EthRxTable {
RxGroup base;
struct cds_hlist_head head;
struct rte_mempool* copyTo;
PdumpSourceRef pdumpUnmatched;
uint16_t port;
uint16_t queue;
} EthRxTable;
Expand Down
6 changes: 5 additions & 1 deletion csrc/iface/rxloop.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ RxLoop_Transfer(RxLoop* rxl, RxGroup* rxg) {

bool dropped = (ctx.dropBits[i >> 6] & (1 << (i & 0x3F))) != 0;
if (unlikely(dropped)) {
drops[nDrops++] = pkt;
if (likely(pkt != NULL)) {
drops[nDrops++] = pkt;
} else {
// pkt was passed to pdump or freed as bounceBufs in EthRxTable_RxBurst
}
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions csrc/pdump/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ PdumpFaceSource_Filter(PdumpSource* s0, struct rte_mbuf* pkt) {
return prob > 0 && // skip pcg32 computation when there's no name match
prob >= pcg32_random_r(&s->rng); // '>=' because UINT32_MAX means always
}

PdumpSourceRef gPdumpEthPortSources[RTE_MAX_ETHPORTS];
60 changes: 60 additions & 0 deletions csrc/pdump/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,67 @@ typedef struct PdumpFaceSource {
uint8_t nameV[PdumpMaxNames * NameMaxLength];
} PdumpFaceSource;

/**
* @brief Perform name filtering to deterine whether to capture a packet.
* @param pkt packet positioned at NDNLPv2 header.
* @returns whether to capture the packet.
*/
__attribute__((nonnull)) bool
PdumpFaceSource_Filter(PdumpSource* s, struct rte_mbuf* pkt);

extern PdumpSourceRef gPdumpEthPortSources[RTE_MAX_ETHPORTS];

/** @brief Packet dump for unmatched frames on an Ethernet port, contextual information. */
typedef struct PdumpEthPortUnmatchedCtx {
PdumpSource* source;
uint16_t count;
struct rte_mbuf* pkts[MaxBurstSize];
} PdumpEthPortUnmatchedCtx;

/** @brief Initialize PdumpEthPortUnmatchedCtx to be disabled. */
static __rte_always_inline void
PdumpEthPortUnmatchedCtx_Disable(PdumpEthPortUnmatchedCtx* ctx) {
ctx->source = NULL;
ctx->count = 0;
POISON(ctx->pkts);
}

/**
* @brief Initialize PdumpEthPortUnmatchedCtx for an ethdev.
* @pre Calling thread holds rcu_read_lock.
*/
static __rte_always_inline void
PdumpEthPortUnmatchedCtx_Init(PdumpEthPortUnmatchedCtx* ctx, uint16_t port) {
PdumpSourceRef* ref = &gPdumpEthPortSources[port];
ctx->source = PdumpSourceRef_Get(ref);
ctx->count = 0;
POISON(ctx->pkts);
}

/**
* @brief Append an Ethernet frame to be captured.
* @param pkt Ethernet frame.
* @retval true packet is accepted and owned by pdump.
* @retval false packet is rejected and should be freed by caller.
*/
static __rte_always_inline bool
PdumpEthPortUnmatchedCtx_Append(PdumpEthPortUnmatchedCtx* ctx, struct rte_mbuf* pkt) {
if (ctx->source == NULL) {
return false;
}
ctx->pkts[ctx->count++] = pkt;
return true;
}

/** @brief Submit accumulated packets to the pdump writer. */
static __rte_always_inline void
PdumpEthPortUnmatchedCtx_Process(PdumpEthPortUnmatchedCtx* ctx) {
if (ctx->source == NULL || ctx->count == 0) {
return;
}
PdumpSource_Process(ctx->source, ctx->pkts, ctx->count);
NULLize(ctx->source);
POISON(ctx->pkts);
}

#endif // NDNDPDK_PDUMP_SOURCE_H
5 changes: 2 additions & 3 deletions dpdk/ealinit/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import "C"
import (
"bufio"
"bytes"
"errors"
"math"
"regexp"
"strconv"
Expand Down Expand Up @@ -192,7 +191,7 @@ func processLogLine(line []byte) {
if len(m[reLogLineError]) > 0 {
e := string(m[reLogLineError][len(logErrorPrefix) : len(m[reLogLineError])-len(logErrorSuffix)])
if e == "-" {
fields = append(fields, zap.Error(errors.New(msg)))
fields = append(fields, zap.String("error", msg))
} else if em := reErrno.FindStringSubmatch(e); em != nil {
errno, _ := strconv.ParseUint(em[reErrnoErrno], 10, 64)
err := unix.Errno(errno)
Expand All @@ -202,7 +201,7 @@ func processLogLine(line []byte) {
zap.Error(err),
)
} else {
fields = append(fields, zap.Error(errors.New(e)))
fields = append(fields, zap.String("error", e))
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/functionalfoundry/graphqlws v0.0.0-20200611113535-7bc58903ce7b
github.com/gogf/greuse v1.1.0
github.com/gopacket/gopacket v1.2.0
github.com/gorilla/schema v1.4.0
github.com/gorilla/schema v1.4.1
github.com/graphql-go/graphql v0.8.1
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240626161320-231a28f62b43
github.com/jacobsa/fuse v0.0.0-20240626143436-8a36813dc074
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ github.com/gopacket/gopacket v1.2.0 h1:eXbzFad7f73P1n2EJHQlsKuvIMJjVXK5tXoSca78I
github.com/gopacket/gopacket v1.2.0/go.mod h1:BrAKEy5EOGQ76LSqh7DMAr7z0NNPdczWm2GxCG7+I8M=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/gorilla/schema v1.4.0 h1:l2N+lRTJtev9SUhBtj6NmSxd/6+8LhvN0kV+H2Y8R9k=
github.com/gorilla/schema v1.4.0/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM=
github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E=
github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand Down
9 changes: 0 additions & 9 deletions iface/ethport/rxtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,3 @@ func newRxgTable(port *Port) (rxt *rxgTable) {
iface.ActivateRxGroup(rxt)
return rxt
}

// RxTablePtrFromPort extracts *C.RxTable pointer from Port.
func RxTablePtrFromPort(port *Port) unsafe.Pointer {
impl, ok := port.rxImpl.(*rxTable)
if !ok {
return nil
}
return unsafe.Pointer(impl.rxt)
}

0 comments on commit 43f887f

Please sign in to comment.