-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventPoller.h
96 lines (80 loc) · 3.16 KB
/
EventPoller.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//
// Created by ubuntu on 17-11-17.
//
#ifndef DISRUPTOR_PP_EVENTPOLLER_H
#define DISRUPTOR_PP_EVENTPOLLER_H
#include <functional>
#include <cstdint>
#include "Interfaces/IDataProvider.h"
#include "Interfaces/ISequencer.h"
#include "Interfaces/ISequence.h"
#include "FixedSequenceGroup.h"
namespace Disruptor {
enum PollState {
Processing,
Gating,
Idle
};
template <typename T>
class EventPoller {
Interfaces::IDataProvider<T> *_dataProvider;
Interfaces::ISequencer *_sequencer;
Interfaces::ISequence *_sequence;
Interfaces::ISequence *_gatingSequence;
public:
EventPoller(Interfaces::IDataProvider<T> *dataProvider,
Interfaces::ISequencer *sequencer,
Interfaces::ISequence *sequence,
Interfaces::ISequence *gatingSequence){
_dataProvider = dataProvider;
_sequencer = sequencer;
_sequence = sequence;
_gatingSequence = gatingSequence;
};
PollState Poll(std::function<bool(T, long, bool)> eventHandler) {
int64_t currentSequence = _sequence->GetValue();
int64_t nextSequence = currentSequence + 1;
int64_t availableSequence = _sequencer->GetHighestPublishedSequence(nextSequence, _gatingSequence->GetValue());
if(nextSequence <= availableSequence) {
bool processNextEvent;
int64_t processedSequence = currentSequence;
do {
T evt = _dataProvider[nextSequence];
processNextEvent = eventHandler(evt, nextSequence, nextSequence == availableSequence);
processedSequence = nextSequence;
nextSequence++;
} while (nextSequence <= availableSequence && processNextEvent);
_sequence->SetValue(processedSequence);
return PollState::Processing;
}
else if(_sequencer->GetCursor() >= nextSequence) {
return PollState::Gating;
}
else {
return PollState::Idle;
}
};
static EventPoller NewInstance(Interfaces::IDataProvider<T> *dataProvider,
Interfaces::ISequencer *sequencer,
Interfaces::ISequence *sequence,
Interfaces::ISequence *cursorSequence,
Interfaces::ISequence *gatingSequences, const int& gtLength) {
Interfaces::ISequence *gatingSequence;
//int gtLen = gatingSequences.size();
if(gtLength == 0) {
gatingSequence = cursorSequence;
}
else if(gtLength == 1) {
gatingSequence = gatingSequences;
}
else {
gatingSequence = FixedSequenceGroup(gatingSequences);
}
return EventPoller(dataProvider, sequencer, sequence, gatingSequence);
};
Interfaces::ISequence* GetSequence() {
return _sequence;
};
};
}
#endif //DISRUPTOR_PP_EVENTPOLLER_H