Skip to content

Commit

Permalink
fetch: truncate file to exact size upon stop
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Dec 5, 2024
1 parent b9212e5 commit b000f17
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 84 deletions.
41 changes: 41 additions & 0 deletions app/fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,44 @@
This package is the congestion aware fetcher, used in the [traffic generator](../tg).
It implements a consumer that follows the TCP CUBIC congestion control algorithm, simulating traffic patterns similar to bulk file transfer.
It requires at least one thread, running the `FetchThread_Run` function.

## Fetch Task Definition

**TaskDef** defines a fetch task that retrieves one segmented object.
A *segmented object* is a group of NDN packets, which have a common name prefix and have SegmentNameComponent as the last component.
The TaskDef contains these fields:

* Prefix: a name prefix except the last SegmentNameComponent.
* Importantly, if you are retrieving from the [file server](../fileserver), this field must end with the VersionNameComponent.
* InterestLifetime
* HopLimit
* SegmentRange: retrieve a consecutive subset of the available segments.
* If the fetcher encounters a Data packet whose FinalBlockId equals its last name component, the fetching will terminate at this segment, even if the upper bound of SegmentRange has not been reached.

Normally, a fetch task generates traffic similar to bulk file transfer, in which contents of the received packets are discarded.
It is however possible to write the received payload into a file.
In this case, the TaskDef additionally contains these fields:

* Filename: output file name.
* FileSize: total file size.
* SegmentLen: the payload length in every segment; the last segment may be shorter.

## Fetcher and its Workers

A **worker** is a thread running the `FetchThread_Run` function.
It can simultaneously process zero or more fetch tasks, which are arranged in an RCU-protected linked list.
It has an io\_uring handle in order to write payload to files when requested.

A **TaskContext** stores information of an ongoing fetch task, which can be initialized from a TaskDef.
It includes a **taskSlot** (aka **C.FetchTask**) used by C code, along with several Go objects.
It is responsible for opening and closing the file, if the TaskDef requests to write payload to a file.
Each taskSlot has an index number that used as the PIT token for its Interests, which allows the reply Data packets to come back to the same taskSlot.

**FetchLogic** contained with the taskSlot implements the algorithmic part of the fetch procedure.
It includes an RTT estimator, a CUBIC-like congestion control implementation, and a retransmission queue.
It makes decisions on when to transmit an Interest for a certain segment number, and gets notified about when the Data arrives with or without a congestion mark.
Nack packets are not considered in the congestion aware fetcher.

**Fetcher** is the top level.
It controls one or more workers and owns one or more task slots.
A incoming TaskDef is placed into an unused task slot, and then added to the worker with the least number of ongoing fetch tasks.
2 changes: 1 addition & 1 deletion app/fetch/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (fetcher *Fetcher) Reset() {
w.ClearTasks()
}
for _, ts := range fetcher.taskSlots {
ts.closeFd()
ts.closeFd(nil)
ts.worker = -1
}
maps.DeleteFunc(taskContextByID, func(id int, task *TaskContext) bool { return task.fetcher == fetcher })
Expand Down
242 changes: 163 additions & 79 deletions app/fetch/fetchtest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,106 +16,190 @@ import (
"github.com/usnistgov/ndn-dpdk/ndn/tlv"
)

const testFetcherWindowCapacity = 512

type testFetcherTask struct {
fetch.TaskDef
FinalBlock int64
Payload []byte
PInterests map[tlv.NNI]int
NInterests int
}

func (ft *testFetcherTask) Run(t *testing.T, fetcher *fetch.Fetcher) (cnt fetch.Counters) {
t.Parallel()
assert, require := makeAR(t)

task, e := fetcher.Fetch(ft.TaskDef)
require.NoError(e)

t0 := time.Now()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if task.Finished() {
break
}
}
task.Stop()

cnt = task.Counters()
t.Logf("Interests %d (unique %d) in %v", ft.NInterests, len(ft.PInterests), time.Since(t0))
t.Logf("Counters %v", cnt)

if ft.Filename != "" {
if fd, e := os.Open(ft.Filename); assert.NoError(e) {
defer fd.Close()
written, e := io.ReadAll(fd)
assert.NoError(e)
assert.Equal(ft.Payload, written)
}
}

return
}

func (ft *testFetcherTask) Serve(segNum tlv.NNI, lastComp ndn.NameComponent, data *ndn.Data) bool {
ft.NInterests++
ft.PInterests[segNum]++

if uint64(segNum) < ft.SegmentBegin || (ft.SegmentEnd > 0 && uint64(segNum) >= ft.SegmentEnd) {
panic(segNum)
}

if ft.Payload != nil {
payloadOffset := int(segNum) * ft.SegmentLen
data.Content = ft.Payload[payloadOffset:min(int(payloadOffset+ft.SegmentLen), len(ft.Payload))]
}
if ft.FinalBlock >= 0 {
if int64(segNum) == ft.FinalBlock {
data.FinalBlock = lastComp
} else if int64(segNum) > ft.FinalBlock {
return false
}
}
return true
}

func newTestFetcherTask(prefix rune) *testFetcherTask {
var td testFetcherTask
td.Prefix = ndn.ParseName("/" + string(prefix))
td.FinalBlock = -1
td.PInterests = map[tlv.NNI]int{}
return &td
}

func TestFetcher(t *testing.T) {
assert, require := makeAR(t)

intFace := intface.MustNew()
defer intFace.D.Close()
t.Cleanup(func() { intFace.D.Close() })

var cfg fetch.Config
cfg.NThreads = 1
cfg.NTasks = 2
cfg.WindowCapacity = 512
cfg.NThreads = 2
cfg.NTasks = 8
cfg.WindowCapacity = testFetcherWindowCapacity

fetcher, e := fetch.New(intFace.D, cfg)
require.NoError(e)
tgtestenv.Open(t, fetcher)
defer fetcher.Close()
t.Cleanup(func() { fetcher.Close() })
fetcher.Launch()

var defA, defB fetch.TaskDef
defA.Prefix = ndn.ParseName("/A")
defA.SegmentBegin, defA.SegmentEnd = 0, 5000
defA.Filename, defA.SegmentLen = filepath.Join(t.TempDir(), "A.bin"), 100
payloadA := make([]byte, int(defA.SegmentEnd)*defA.SegmentLen)
randBytes(payloadA)
defB.Prefix = ndn.ParseName("/B")
defB.SegmentBegin, defB.SegmentEnd = 1000, 4000
const finalBlockB = 1800

pInterestsA, nInterestsA, pInterestsB, nInterestsB := map[tlv.NNI]int{}, 0, map[tlv.NNI]int{}, 0
tempDir := t.TempDir()
ftByName := map[rune]*testFetcherTask{}

t.Run("0", func(t *testing.T) {
assert, _ := makeAR(t)

ft := newTestFetcherTask('0')
ft.SegmentBegin, ft.SegmentEnd = 1000, 1000
ftByName['0'] = ft
// empty SegmentRange

cnt := ft.Run(t, fetcher)
assert.Zero(cnt.NRxData)
assert.Zero(len(ft.PInterests))
})

t.Run("A", func(t *testing.T) {
assert, _ := makeAR(t)

ft := newTestFetcherTask('A')
ft.SegmentBegin, ft.SegmentEnd = 1000, 4000
ft.FinalBlock = 1800
ftByName['A'] = ft
// bounded by both SegmentRange and FinalBlock

cnt := ft.Run(t, fetcher)
assert.EqualValues(ft.FinalBlock-int64(ft.SegmentBegin)+1, cnt.NRxData)
nUniqueInterests := int64(len(ft.PInterests))
assert.GreaterOrEqual(nUniqueInterests, ft.FinalBlock-int64(ft.SegmentBegin)+1)
assert.Less(nUniqueInterests, ft.FinalBlock-int64(ft.SegmentBegin)+testFetcherWindowCapacity)
})

t.Run("H", func(t *testing.T) {
assert, _ := makeAR(t)

ft := newTestFetcherTask('H')
ft.SegmentBegin, ft.SegmentEnd, ft.SegmentLen = 0, 5000, 100
ft.Filename = filepath.Join(tempDir, "H.bin")
ft.Payload = make([]byte, int64(ft.SegmentLen)*int64(ft.SegmentEnd))
randBytes(ft.Payload)
ftByName['H'] = ft
// bounded by SegmentRange, write to file

cnt := ft.Run(t, fetcher)
assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, cnt.NRxData)
assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, len(ft.PInterests))
assert.Zero(cnt.NInFlight)
assert.InDelta(float64(ft.NInterests), float64(cnt.NTxRetx+cnt.NRxData), testFetcherWindowCapacity)
})

t.Run("I", func(t *testing.T) {
assert, _ := makeAR(t)

ft := newTestFetcherTask('I')
ft.SegmentBegin, ft.SegmentEnd, ft.SegmentLen = 0, 900, 400
ft.Filename = filepath.Join(tempDir, "I.bin")
fileSize := int64(ft.SegmentLen)*int64(ft.SegmentEnd) - 7
ft.FileSize = &fileSize
ft.Payload = make([]byte, fileSize)
randBytes(ft.Payload)
ftByName['I'] = ft
// bounded by SegmentRange, write to file, truncate file

cnt := ft.Run(t, fetcher)
assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, cnt.NRxData)
assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, len(ft.PInterests))
assert.Zero(cnt.NInFlight)
assert.InDelta(float64(ft.NInterests), float64(cnt.NTxRetx+cnt.NRxData), testFetcherWindowCapacity)
})

go func() {
for packet := range intFace.Rx {
require.NotNil(packet.Interest)
if !assert.NotNil(packet.Interest) || !assert.Len(packet.Interest.Name, 2) {
continue
}
data := ndn.MakeData(packet.Interest, time.Millisecond)

lastComp := packet.Interest.Name.Get(-1)
assert.EqualValues(an.TtSegmentNameComponent, lastComp.Type)
comp0, comp1 := packet.Interest.Name[0], packet.Interest.Name[1]
assert.EqualValues(an.TtGenericNameComponent, comp0.Type)
assert.EqualValues(1, comp0.Length())
assert.EqualValues(an.TtSegmentNameComponent, comp1.Type)
var segNum tlv.NNI
assert.NoError(segNum.UnmarshalBinary(lastComp.Value))

switch {
case defA.Prefix.IsPrefixOf(packet.Interest.Name):
nInterestsA++
pInterestsA[segNum]++
assert.Less(uint64(segNum), defA.SegmentEnd)
payloadOffset := int(segNum) * defA.SegmentLen
data.Content = payloadA[payloadOffset : payloadOffset+defA.SegmentLen]
case defB.Prefix.IsPrefixOf(packet.Interest.Name):
nInterestsB++
pInterestsB[segNum]++
assert.GreaterOrEqual(uint64(segNum), defB.SegmentBegin)
assert.Less(uint64(segNum), defB.SegmentEnd)
if segNum == finalBlockB {
data.FinalBlock = lastComp
} else if segNum > finalBlockB {
continue
}
default:
assert.NoError(segNum.UnmarshalBinary(comp1.Value))

respond := false
ft := ftByName[rune(comp0.Value[0])]
if ft == nil {
assert.Fail("unexpected Interest", packet.Interest.Name)
} else {
respond = ft.Serve(segNum, comp1, &data)
}

if rand.Float64() > 0.01 {
if respond && rand.Float64() > 0.01 {
intFace.Tx <- data
}
}
}()

taskA, e := fetcher.Fetch(defA)
require.NoError(e)
taskB, e := fetcher.Fetch(defB)
require.NoError(e)

t0 := time.Now()
{
ticker := time.NewTicker(time.Millisecond)
for range ticker.C {
if taskA.Finished() && taskB.Finished() {
break
}
}
ticker.Stop()
}
taskA.Stop()
taskB.Stop()

cntA, cntB := taskA.Counters(), taskB.Counters()
assert.EqualValues(defA.SegmentEnd-defA.SegmentBegin, cntA.NRxData)
assert.EqualValues(defA.SegmentEnd-defA.SegmentBegin, len(pInterestsA))
assert.Zero(cntA.NInFlight)
assert.InDelta(float64(nInterestsA), float64(cntA.NTxRetx+cntA.NRxData), float64(cfg.WindowCapacity))

assert.EqualValues(finalBlockB-defB.SegmentBegin+1, cntB.NRxData)
assert.GreaterOrEqual(len(pInterestsB), int(finalBlockB-defB.SegmentBegin+1))
assert.Less(len(pInterestsB), int(finalBlockB-defB.SegmentBegin)+cfg.WindowCapacity)

t.Logf("/A Interests %d (unique %d) and /B Interests %d (unique %d) in %v",
nInterestsA, len(pInterestsA), nInterestsB, len(pInterestsB), time.Since(t0))

if fA, e := os.Open(defA.Filename); assert.NoError(e) {
defer fA.Close()
writtenA, e := io.ReadAll(fA)
assert.NoError(e)
assert.Equal(payloadA, writtenA)
}
}
19 changes: 17 additions & 2 deletions app/fetch/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (task *TaskContext) Counters() Counters {
func (task *TaskContext) Stop() {
eal.CallMain(func() {
task.w.RemoveTask(eal.MainReadSide, task.ts)
task.ts.closeFd()
task.ts.closeFd(task.d.FileSize)
close(task.stopping)
taskContextLock.Lock()
defer taskContextLock.Unlock()
Expand Down Expand Up @@ -78,6 +78,11 @@ type TaskDef struct {
// If omitted, payload is not written to a file.
Filename string `json:"filename,omitempty"`

// FileSize is total payload length.
// This is only relevant when writing to a file.
// If set, the file will be truncated to this size after fetching is completed.
FileSize *int64 `json:"fileSize"`

// SegmentLen is the payload length in each segment.
// This is only needed when writing to a file.
// If any segment has incorrect Content TLV-LENGTH, the output file would not contain correct payload.
Expand Down Expand Up @@ -169,7 +174,7 @@ func (ts *taskSlot) Logic() *Logic {
return (*Logic)(&ts.logic)
}

func (ts *taskSlot) closeFd() {
func (ts *taskSlot) closeFd(fileSize *int64) {
fd := int(ts.fd)
if fd < 0 {
return
Expand All @@ -178,7 +183,17 @@ func (ts *taskSlot) closeFd() {
logEntry := logger.With(
zap.Int("slot-index", int(ts.index)),
zap.Int("fd", fd),
zap.Int64p("file-size", fileSize),
)

if fileSize != nil {
if e := unix.Ftruncate(fd, *fileSize); e != nil {
logEntry.Warn("unix.Ftruncate error",
zap.Error(e),
)
}
}

if e := unix.Close(fd); e != nil {
logEntry.Warn("unix.Close error",
zap.Error(e),
Expand Down
Loading

0 comments on commit b000f17

Please sign in to comment.