-
Notifications
You must be signed in to change notification settings - Fork 0
/
cpg.go
645 lines (535 loc) · 23.2 KB
/
cpg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
package mrnesbits
// file cpg.go holds structs, methods, and data structures related to the coordination of
// executing mrnesbits models through the APIs of computational pattern function instances
import (
"fmt"
"github.com/iti/evt/evtm"
"github.com/iti/evt/vrtime"
"github.com/iti/mrnes"
"github.com/iti/rngstream"
"math"
)
// CompPattern functions, messages, and edges are described by structs in the desc package,
// and at runtime are read into MrNesbits. Runtime data-structures (such as CmpPtnInst
// and CmpPtnMsg) are created from constructors that take desc structures as arguments.
// CmpPtnInstByName and CmpPtnInstByID take the the name (alt., id) of an instance of a CompPattern,
// and associate a pointer to its struct
var CmpPtnInstByName map[string]*CmpPtnInst = make(map[string]*CmpPtnInst)
var CmpPtnInstByID map[int]*CmpPtnInst = make(map[int]*CmpPtnInst)
type execRecord struct {
cpID int // identity of comp pattern starting this execution
src string // func initiating execution trace
start float64 // time the trace started
}
type execSummary struct {
n int // number of executions
samples []float64 // collection of samples
completed int // number of completions
sum float64 // sum of measured execution times of completed
sum2 float64 // sum of squared execution times
maxv float64 // largest value seen
minv float64 // least value seen
}
// CmpPtnInst describes a particular instance of a CompPattern,
// built from information read in from CompPatternDesc struct and used at runtime
type CmpPtnInst struct {
name string // this instance's particular name
cpType string
ID int // unique id
funcs map[string]*CmpPtnFuncInst // use func label to get to func in that pattern with that label
msgs map[string]CompPatternMsg // MsgType indexes msgs
Rngs *rngstream.RngStream
Graph *CmpPtnGraph // graph describing structure of funcs and edges
active map[int]execRecord // executions that are active now
activeCnt map[int]int // number of instances of executions with common execID (>1 by branching)
lostExec map[int]evtm.EventHandlerFunction // call this handler when a packet for a given execID is lost
finished map[string]execSummary // summary of completed executions
}
// createCmpPtnInst is a constructor. Inputs given by two structs from desc package.
// cpd describes the CompPattern, and cpid describes this instance's initialization parameters.
func createCmpPtnInst(ptnInstName string, cpd CompPattern, cpid CPInitList) (*CmpPtnInst, error) {
cpi := new(CmpPtnInst)
// cpd.Edges is list of CmpPtnGraphEdge structs with attributes
// SrcLabel, MsgType, DstLabel, MethodCode, the assumption being that
// the labels are for funcs all in the CPI.
// cpd.ExtEdges is a map whose index is the name of some comp pattern instance, with
// attribute being a list of CmpPtnGraphEdges where the SrcLabel belongs to 'this'
// CPI and the DstLabel (and MethodCode) belong to the CPI whose name is the index
// the instance gets name on the input list
cpi.name = ptnInstName
// get assuredly unique id
cpi.ID = nxtID()
// initialize slice of the func instances that make up the cpmPtnInst
cpi.funcs = make(map[string]*CmpPtnFuncInst)
// initialize map that carries information about active execution threads
cpi.active = make(map[int]execRecord)
// initialize map that carries number of active execs with same execid
cpi.activeCnt = make(map[int]int)
// initialize map that carries event scheduler to call if packet on execID is lost
cpi.lostExec = make(map[int]evtm.EventHandlerFunction)
// initialize map that carries information about completed executions
cpi.finished = make(map[string]execSummary)
// make a representation of the CmpPtnFuncInst where funcs are nodes and edges are
// labeled with message type
var gerr error
cpi.Graph, gerr = createCmpPtnGraph(&cpd)
// enable access to this struct through its name, and through its id
CmpPtnInstByName[cpi.name] = cpi
CmpPtnInstByID[cpi.ID] = cpi
// The cpd structure has a list of desc descriptions of functions.
// Call a constructor for each, depending on the func's execution type.
// Save the created runtime representation of the function in the CmpPtnInst's list of funcs
for _, funcDesc := range cpd.Funcs {
if !validFuncClass(funcDesc.Class) {
panic(fmt.Errorf("function class %s not recognized", funcDesc.Class))
}
df := createFuncInst(ptnInstName, cpi.ID, &funcDesc, cpid.State[funcDesc.Label], cpid.UseYAML)
cpi.funcs[df.Label] = df
}
// save copies of all the messages for this CompPattern found in the initialization struct's list of messages
cpi.msgs = make(map[string]CompPatternMsg)
for _, msg := range cpid.Msgs {
cpi.msgs[msg.MsgType] = msg
}
// create and save an rng
cpi.Rngs = rngstream.New(cpi.name)
// flesh out the edge tables for the function instances
// cpi.buildEdgeTables(&cpd)
return cpi, gerr
}
// buildAllEdgeTables goes through all the edges declared to the computational patterns
// and extracts from these the information needed to populate function instance data
// structures with what they need to recognize legimate messages and call the right methods
func buildAllEdgeTables(cpd *CompPatternDict) {
// organize edges by comp pattern, inEdge, and outEdge, and whether x-CP
cmpPtnEdges := make(map[string]map[string]map[string][]*ExtCmpPtnGraphEdge)
for cpName := range cpd.Patterns {
cmpPtnEdges[cpName] = make(map[string]map[string][]*ExtCmpPtnGraphEdge)
cpi := CmpPtnInstByName[cpName]
for funcLabel := range cpi.funcs {
cmpPtnEdges[cpName][funcLabel] = make(map[string][]*ExtCmpPtnGraphEdge)
cmpPtnEdges[cpName][funcLabel]["InEdge"] = []*ExtCmpPtnGraphEdge{}
cmpPtnEdges[cpName][funcLabel]["OutEdge"] = []*ExtCmpPtnGraphEdge{}
cmpPtnEdges[cpName][funcLabel]["ExtInEdge"] = []*ExtCmpPtnGraphEdge{}
}
}
// go through all edges and categorize each
for cpName, cp := range cpd.Patterns {
// go through the internal edges
for _, edge := range cp.Edges {
// note edge as an InEdge, transform to edgeStruct
XEdge := new(ExtCmpPtnGraphEdge)
XEdge.CPGE = edge
cmpPtnEdges[cpName][edge.DstLabel]["InEdge"] =
append(cmpPtnEdges[cpName][edge.DstLabel]["InEdge"], XEdge)
// if not an initiation edge note edge as an OutEdge
if XEdge.CPGE.SrcLabel == XEdge.CPGE.DstLabel {
continue
}
cmpPtnEdges[cpName][edge.SrcLabel]["OutEdge"] =
append(cmpPtnEdges[cpName][edge.SrcLabel]["OutEdge"], XEdge)
}
// go through the external edges. Recall that type of cp.ExtEdges is map[string][]XCPEdge
for _, xList := range cp.ExtEdges {
for _, xedge := range xList {
XEdge := new(ExtCmpPtnGraphEdge)
XEdge.SrcCP = cpName
XEdge.CPGE = CmpPtnGraphEdge{SrcLabel: xedge.SrcLabel, MsgType: xedge.MsgType,
DstLabel: xedge.DstLabel, MethodCode: xedge.MethodCode}
// note xedge as an InEdge. The SrcLabel refers to a function in cpName.
// a chgCP func is assumed to have (and performed) the transformation that identifies
// the message type and dstLabel that will also be found in the edge
cmpPtnEdges[xedge.NxtCP][xedge.DstLabel]["ExtInEdge"] =
append(cmpPtnEdges[xedge.NxtCP][xedge.DstLabel]["ExtInEdge"], XEdge)
}
}
}
// now each comp function instance needs to have its inEdgeMethodCode and outEdges slices
// created
for cpName := range cpd.Patterns {
cpi := CmpPtnInstByName[cpName]
for fLabel, cpfi := range cpi.funcs {
for _, edge := range cmpPtnEdges[cpName][fLabel]["InEdge"] {
es := createEdgeStruct(cpfi.CPID, edge.CPGE.SrcLabel, edge.CPGE.MsgType)
cpfi.inEdgeMethodCode[es] = edge.CPGE.MethodCode
}
for _, edge := range cmpPtnEdges[cpName][fLabel]["OutEdge"] {
es := createEdgeStruct(cpfi.CPID, edge.CPGE.DstLabel, edge.CPGE.MsgType)
cpfi.outEdges = append(cpfi.outEdges, es)
}
for _, edge := range cmpPtnEdges[cpName][fLabel]["ExtInEdge"] {
srcCPID := CmpPtnInstByName[edge.SrcCP].ID
es := createEdgeStruct(srcCPID, edge.CPGE.SrcLabel, edge.CPGE.MsgType)
cpfi.inEdgeMethodCode[es] = edge.CPGE.MethodCode
}
}
}
}
type trackingGroup struct {
name string
active map[int]execRecord
finished execSummary
activeCnt int
}
var allTrackingGroups map[string]*trackingGroup = make(map[string]*trackingGroup)
var execIDToTG map[int]*trackingGroup = make(map[int]*trackingGroup)
func createTrackingGroup(name string) *trackingGroup {
tg := new(trackingGroup)
tg.name = name
tg.active = make(map[int]execRecord)
tg.finished.samples = []float64{}
allTrackingGroups[name] = tg
return tg
}
// startRecExec records the name of the initialiating func, and starting
// time of an execution trace in the named tracking group
func startRecExec(tgName string, execID, cpID int, funcName string, time float64) {
// ensure we don't start the same execID more than once
_, present := execIDToTG[execID]
if present {
panic(fmt.Errorf("attempt to start already started tracking on execID %d", execID))
}
// create a tracking group if needed
tg, present := allTrackingGroups[tgName]
if !present {
tg = createTrackingGroup(tgName)
}
execIDToTG[execID] = tg
activeRec := execRecord{cpID: cpID, src: funcName, start: time}
tg.active[execID] = activeRec
}
// EndRecExec computes the completed execution time of the execution identified,
// given the ending time, incorporates its statistics into the CmpPtnInst
// statistics summary
func EndRecExec(execID int, time float64) float64 {
// make sure we have a tracking group for this execID
tg, present := execIDToTG[execID]
if !present {
panic(fmt.Errorf("failure to find tracking group defined for execID %d", execID))
}
rtn := time - tg.active[execID].start
delete(tg.active, execID)
tg.finished.n += 1
tg.finished.samples = append(tg.finished.samples, rtn)
tg.finished.completed += 1
tg.finished.sum += rtn
tg.finished.sum2 += rtn * rtn
tg.finished.maxv = math.Max(tg.finished.maxv, rtn)
tg.finished.minv = math.Min(tg.finished.minv, rtn)
return rtn
}
func (cpi *CmpPtnInst) cleanUp() {
for execID := range cpi.active {
delete(cpi.active, execID)
}
}
// ExecReport reports delay times of completed cmpPtnInst executions
func (cpi *CmpPtnInst) ExecReport() []float64 {
cpi.cleanUp()
for _, rec := range cpi.finished {
if rec.n > 0 {
return rec.samples
} else {
continue
}
}
return []float64{}
/*
if rec.n == 1 {
fmt.Printf("%s initiating function %s measures %f seconds\n", cpi.name, src, rec.sum)
} else {
loss := 1.0 - float64(rec.completed)/float64(rec.n)
N := float64(rec.completed)
rtN := math.Sqrt(N)
m := rec.sum / N
sigma2 := (rec.sum2 - (rec.sum*rec.sum)/N) / (N - 1)
if sigma2 < 0.0 {
sigma2 = 0.0
}
sigma := math.Sqrt(sigma2)
w := 1.96 * sigma / rtN
fmt.Printf("%s initiating function %s mean is %f +/- %f seconds with 95 percent' confidence using %d samples, loss percentage %f\n",
cpi.name, src, m, w, rec.n, loss)
}
}
*/
}
// EndPts carries information on a CmpPtnMsg about where the execution thread started,
// and holds a place for a destination
type EndPts struct {
SrtLabel string
EndLabel string
}
// A CmpPtnMsg struct describes a message going from one CompPattern function to another.
// It carries ancillary information about the message that is included for referencing.
type CmpPtnMsg struct {
ExecID int // initialize when with an initating comp pattern message. Carried by every resulting message.
SrcCP string // name of comp pattern instance from which the message originates
NxtCP string // name of comp pattern instance to which the message should be shifted en-route to DstCP
DstCP string // name of comp pattern instance to which the message is ultimated directed
PrevCPID int // ID of the comp pattern through which the message most recently passed
Edge CmpPtnGraphEdge
// do we need/want CmpHdr?
CmpHdr EndPts
MsgType string // describes function of message
MsgLen int // number of bytes
PcktLen int // parameter impacting execution time
Rate float64 // when non-zero, a rate limiting attribute that might used, e.g., in modeling IO
Start bool // start the timer
Payload any // free for "something else" to carry along and be used in decision logic
}
// carriesPckt indicates whether the message conveys information about a packet or a flow
func (cpm *CmpPtnMsg) carriesPckt() bool {
return (cpm.MsgLen > 0 && cpm.PcktLen > 0)
}
// CreateCmpPtnMsg is a constructor whose arguments are all the attributes needed to make a CmpPtnMsgEdge.
// One is created and returned.
func CreateCmpPtnMsg(edge CmpPtnGraphEdge, srt, end string, msgType string,
msgLen int, pcktLen int, rate float64, srcCP, dstCP string,
payload any, execID int) CmpPtnMsg {
// record that the srt point is the srcLabel on the message, but at this construction no destination is chosen
cmphdr := EndPts{SrtLabel: srt, EndLabel: end}
// look up ID of SrcCP
cpi := CmpPtnInstByName[srcCP]
return CmpPtnMsg{PrevCPID: cpi.ID, SrcCP: srcCP, DstCP: dstCP, Edge: edge, CmpHdr: cmphdr, MsgType: msgType,
MsgLen: msgLen, PcktLen: pcktLen, Rate: rate, Payload: payload, ExecID: execID, Start: false}
}
// InEdgeMsg returns the InEdge structure embedded in a CmpPtnMsg
func InEdgeMsg(cmp *CmpPtnMsg) InEdge {
ies := new(InEdge)
// if the src and dst comp patterns are the same, return the src from the edge
if cmp.SrcCP == cmp.DstCP {
ies.SrcLabel = cmp.Edge.SrcLabel
} else {
// put in code for 'external'
ies.SrcLabel = "***"
}
ies.MsgType = cmp.MsgType
return *ies
}
// CmpPtnFuncInstHost returns the name of the host to which the CmpPtnFuncInst given as argument is mapped.
func CmpPtnFuncInstHost(cpfi *CmpPtnFuncInst) string {
cpfLabel := cpfi.funcLabel()
cpfCmpPtn := cpfi.funcCmpPtn()
return CmpPtnMapDict.Map[cpfCmpPtn].FuncMap[cpfLabel]
}
// FuncExecTime returns the increase in execution time resulting from executing the
// CmpPtnFuncInst offered as argument, to the message also offered as argument.
// If the pcktlen of the message does not exactly match the pcktlen parameter of a func timing entry,
// an interpolation or extrapolation of existing entries is performed.
func FuncExecTime(cpfi *CmpPtnFuncInst, op string, msg *CmpPtnMsg) float64 {
// get the parameters needed for the func execution time lookup
hostLabel := CmpPtnMapDict.Map[cpfi.funcCmpPtn()].FuncMap[cpfi.funcLabel()]
hardware := netportal.EndptCPUModel(hostLabel)
// if we don't have an entry for this function type, complain
_, present := funcExecTimeTbl[op]
if !present {
panic(fmt.Errorf("no function timing look up for operation %s", op))
}
// if we don't have an entry for the named CPU for this function type, throw an error
lenMap, here := funcExecTimeTbl[op][hardware]
if !here || lenMap == nil {
panic(fmt.Errorf("no function timing look for operation %s on hardware %s", op, hardware))
}
// lenMap is map[int]string associating an execution time for a packet with the stated length,
// so long as we have that length
timing, present2 := lenMap[msg.PcktLen]
if present2 {
return timing // we have the length, so just return the timing
}
// length not present so estimate from data we do have about this function type and CPU.
// If at least two measurements are present create a least-squares model and interpolate or
// extrapolate to the argument pcklen. If only one measurement is present compute the
// 'time per byte' from that measurement and extrapolate to the message pcktlen.
type XYPoint struct {
x float64
y float64
}
points := []XYPoint{}
for pcktLen, timing := range lenMap {
points = append(points, XYPoint{x: float64(pcktLen), y: timing})
}
// if there is only one point, extrapolate from time per unit length
if len(points) == 1 {
timePerUnit := points[0].y / points[0].x
return float64(msg.PcktLen) * timePerUnit
}
// do a linear regression on the others
sumX := float64(0.0)
sumX2 := float64(0.0)
sumY := float64(0.0)
sumY2 := float64(0.0)
sumXY := float64(0.0)
for _, point := range points {
sumX += point.x
sumX2 += point.x * point.x
sumY += point.y
sumY2 += point.y * point.y
sumXY += point.x * point.y
}
N := float64(len(points))
m := (N*sumXY - sumX*sumY) / (N*sumX2 - sumX*sumX)
b := (sumY - m*sumX) / N
return float64(msg.PcktLen)*m + b
}
// EnterFunc is an event-handling routine, scheduled by an evtm.EventManager to execute and simulate the results of
// a message arrival to a CmpPtnInst function. The particular CmpPtnInst and particular message
// are given as arguments to the function. A type-unspecified return is provided.
func EnterFunc(evtMgr *evtm.EventManager, cpFunc any, cpMsg any) any {
// extract the CmpPtnFuncInst and CmpPtnMsg involved in this event
cpfi := cpFunc.(*CmpPtnFuncInst)
// see if this function is active, if not, bye
if !cpfi.funcActive() {
return nil
}
initiating := cpfi.isInitiating()
var cpm *CmpPtnMsg
if cpMsg != nil {
cpm = cpMsg.(*CmpPtnMsg)
}
endptName := cpfi.host
endpt := mrnes.EndptDevByName[endptName]
// need to create an initial message?
if initiating && cpm == nil {
cpm = new(CmpPtnMsg)
*cpm = *cpfi.InitMsg
// flag to start the timer on this execution thread
cpm.Start = true
cpm.ExecID = numExecThreads
cpm.MsgType = "initiate"
numExecThreads += 1
if cpfi.funcTrace() {
// work out the endpoint device and log the entry there
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, endpt.DevID(), "enter", cpm.carriesPckt(), cpm.Rate)
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, cpfi.ID, "enter", cpm.carriesPckt(), cpm.Rate)
}
} else if cpfi.funcTrace() {
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, cpfi.ID, "enter", cpm.carriesPckt(), cpm.Rate)
}
// start the timer if required
if cpm.Start {
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, endpt.DevID(), "enter", cpm.carriesPckt(), cpm.Rate)
cpi := CmpPtnInstByName[cpfi.funcCmpPtn()]
traceName := cpi.name + ":" + cpfi.funcLabel()
startRecExec(traceName, cpm.ExecID, cpfi.CPID, cpfi.funcLabel(), evtMgr.CurrentSeconds())
cpm.Start = false
}
// get the method code associated with the message arrival from the named source
es := edgeStruct{CPID: cpm.PrevCPID, FuncLabel: cpm.Edge.SrcLabel, MsgType: cpm.MsgType}
methodCode, present := cpfi.inEdgeMethodCode[es]
if !present {
fmt.Printf("function %s receives unrecognized input message type %s, ignored\n",
cpfi.funcLabel(), cpm.MsgType)
return nil
}
// get the functions that start, and stop the function execution
methods := cpfi.respMethods[methodCode]
methods.Start(evtMgr, cpfi, methodCode, cpm)
// if function not now active stop the collection of information about the execution thread
if !cpfi.funcActive() {
EndRecExec(cpm.ExecID, evtMgr.CurrentSeconds())
}
return nil
}
// EmptyInitFunc exists to detect when there is actually an initialization event handler
// (by having a 'emptyInitFunc' below be re-written to point to something else
func EmptyInitFunc(evtMgr *evtm.EventManager, cpFunc any, cpMsg any) any {
return nil
}
var emptyInitFunc evtm.EventHandlerFunction = EmptyInitFunc
// ExitFunc is an event handling routine that implements the scheduling of messages which result from the completed
// (simulated) execution of a CmpPtnFuncInst. The CmpPtnFuncInst and the message that triggered the execution
// are given as arguments to ExitFunc. This routine calls the CmpPtnFuncInst's function that computes
// the effect of doing the simulation, a routine which (by the CmpPtnFuncInst interface definition) returns a slice
// of CmpPtnMsgs which are then pushed further along the CompPattern chain.
func ExitFunc(evtMgr *evtm.EventManager, cpFunc any, cpMsg any) any {
// extract the CmpPtnFuncInst and CmpPtnMsg involved in this event
cpfi := cpFunc.(*CmpPtnFuncInst)
cpm := cpMsg.(*CmpPtnMsg)
// get the response(s), if any. Note that result is a slice of CmpPtnMsgs.
msgs := cpfi.funcResp(cpm.ExecID)
// note exit from function
if cpfi.funcTrace() {
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, cpfi.ID, "exit", cpm.carriesPckt(), cpm.Rate)
}
// see if we're done
if len(msgs) == 0 || len(cpm.Edge.DstLabel) == 0 {
endptName := cpfi.host
endpt := mrnes.EndptDevByName[endptName]
traceMgr.AddTrace(evtMgr.CurrentTime(), cpm.ExecID, 0, endpt.DevID(), "exit", cpm.carriesPckt(), cpm.Rate)
EndRecExec(cpm.ExecID, evtMgr.CurrentSeconds())
// fmt.Printf("at time %f execID %d hit the end of the function chain at %s with delay %f\n",
// evtMgr.CurrentSeconds(), cpm.ExecID, cpfi.funcLabel(), delay)
return nil
}
// get a pointer to the comp pattern the function being exited belongs to
cpi := CmpPtnInstByName[cpfi.funcCmpPtn()]
// treat each msg individually
for _, msg := range msgs {
// the CmpPtnInst should have representation for the next function (which
// will be the destination label of the edge embedded in the message). Check, and
// panic if not
// allow for possibility that the next comp pattern is different
xcpi := cpi
if len(msg.NxtCP) > 0 && msg.SrcCP != msg.NxtCP {
xcpi = CmpPtnInstByName[msg.NxtCP]
msg.NxtCP = ""
}
// notice that if the destination CP is different from the source, we
// expect that the code which recreated msg put in the the edge the label
// within the destination CP of the target
nxtf, present := xcpi.funcs[msg.Edge.DstLabel]
if present {
dstHost := CmpPtnMapDict.Map[xcpi.name].FuncMap[nxtf.Label]
// Staying on the host means scheduling w/o delay the arrival at the next func
if cpfi.host == dstHost {
evtMgr.Schedule(nxtf, msg, EnterFunc, vrtime.SecondsToTime(0.0))
} else {
// to get to the dstHost we need to go through the network
isPckt := msg.carriesPckt()
netportal.EnterNetwork(evtMgr, cpfi.host, dstHost, msg.MsgLen, cpm.ExecID, isPckt, msg.Rate, msg,
nxtf, ReEnter, xcpi, LostCmpPtnMsg)
}
} else {
panic("missing dstLabel in CmpPtnInst description")
}
}
return nil
}
func ReEnter(evtMgr *evtm.EventManager, cpFunc any, msg any) any {
evtMgr.Schedule(cpFunc, msg, EnterFunc, vrtime.SecondsToTime(0.0))
return nil
}
// LostCmpPtnMsg is scheduled from the mrnes side to report the loss of a comp pattern message
func LostCmpPtnMsg(evtMgr *evtm.EventManager, context any, msg any) any {
cpMsg := msg.(*CmpPtnMsg)
// cpMmsg.CP is the name of pattern
CP := cpMsg.DstCP
execID := cpMsg.ExecID
// look up a description of the comp pattern
cpi := CmpPtnInstByName[CP]
cpi.activeCnt[execID] -= 1
// if no other msgs active for this execID report loss
if cpi.activeCnt[execID] == 0 {
fmt.Printf("Comp Pattern %s lost message for execution id %d\n", CP, execID)
return nil
}
return nil
}
// numExecThreads is used to place a unique integer code on every newly created initiation message
var numExecThreads int = 1
// scheduleInitEvts goes through all CmpPtnInsts and for every self-initiating CmpPtnFuncInst
// on it schedules the initiation event handler
func schedInitEvts(evtMgr *evtm.EventManager) {
// loop over all comp pattern instances
for _, cpi := range CmpPtnInstByName {
// loop over all of its funcs
for _, cpFunc := range cpi.funcs {
// see if this function calls for an initialization function and
// schedule it if so
if cpFunc.funcInitEvtHdlr() != nil {
evtMgr.Schedule(cpFunc, nil, cpFunc.funcInitEvtHdlr(), vrtime.SecondsToTime(0.0))
}
}
}
}