-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.cpp
385 lines (283 loc) · 12.7 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
//-----------------------------------------------------------------------------
/*!
\file
\brief A simple Observable implementation.
Let’s write our own Observable interface implementation to understand what’s going on under the hood when we work with RxJS.
An observable is just a function. This function takes in an observer as an argument, and returns a subscription object.
An observer is just an object with three methods: next which takes in a value, error which takes in an error message and
complete with has no arguments.
A subscription object represents a disposable resource, such as the execution of an Observable. This subscription has a
bunch of methods such as add and remove, but the most important one is unsubscribe which takes no argument and just disposes
the resource held by the subscription.
Nach: https://medium.com/@fknussel/a-simple-observable-implementation-c9c809c89c69
*/
//-----------------------------------------------------------------------------
/* -- Includes ------------------------------------------------------------ */
#include <stdint.h>
#include <iostream>
#include <string>
/* -- Defines ------------------------------------------------------------- */
using std::cout;
using std::endl;
using std::string;
/* -- Types --------------------------------------------------------------- */
template <typename V, typename E>
class Observer
{
public:
virtual void next(V const & value) = 0;
virtual void error(E const & err) = 0;
virtual void complete() = 0;
};
//this is an intermediate observer, that is used in combination with the map functionality!!!
template <typename V, typename E>
class MappingObserver : public Observer<V,E>
{
public:
Observer<V,E> * observer; //the actuall observer that wants to get notified
};
class Subscription
{
public:
virtual void unsubscribe() = 0;
};
template <typename V, typename E>
class Observable : private Subscription //in this exampel, the Observable also implements the Subscription object...
{
private:
//typedef for a C++ pointer to a member function
//(that takes an pointer to an observer and returns an pointer to a subscription object)
typedef Subscription * (Observable::*SubscribeHandler)(Observer<V,E> * observer);
SubscribeHandler subscribeHandler;
MappingObserver<V,E> * mappingObserver;
Observable * mappingObservable;
V value;
V const * values;
size_t valuesCount;
E err;
private:
//constructor is private - as it shall not be called directly.
//instead, a factory function like "from", "of" or "throwError" shall be used!
Observable()
{
this->subscribeHandler = nullptr;
this->mappingObservable = nullptr;
this->values = nullptr;
this->valuesCount = 0;
this->err = nullptr;
}
//this is the method that is called when someone subscribes to the observable that was constructed...
//...using the "of" method
Subscription * subscribeHandler_of(Observer<V,E> * observer)
{
//call (the one and only) "next"
observer->next(value);
//finally complete
observer->complete();
//prevent further invocation, by setting the handler fuction to NULL (as the observable has completed now!)
subscribeHandler = nullptr;
//as Observable derives from Subscription it is very easy at this point to return an Subscription object
return this;
}
//this is the method that is called when someone subscribes to the observable that was constructed...
//...using the "from" method
Subscription * subscribeHandler_from(Observer<V,E> * observer)
{
//call one "next" after the other
for (size_t i = 0; i < valuesCount; i++) observer->next(values[i]);
//finally complete
observer->complete();
//prevent further invocation, by setting the handler fuction to NULL (as the observable has completed now!)
subscribeHandler = nullptr;
//as Observable derives from Subscription it is very easy at this point to return an Subscription object
return this;
}
//this is the method that is called when someone subscribes to the observable that was constructed...
//...using the "throwError" method
Subscription * subscribeHandler_throwError(Observer<V,E> * observer)
{
//call error
observer->error(err);
//finally complete
observer->complete();
//prevent further invocation, by setting the handler fuction to NULL (as the observable has completed now!)
subscribeHandler = nullptr;
//as Observable derives from Subscription it is very easy at this point to return an Subscription object
return this;
}
//this is the method that is called when someone subscribes to the observable that was constructed...
//... using the "map" method of another observable
Subscription * subscribeHandler_map(Observer<V,E> * observer)
{
mappingObserver->observer = observer;
return mappingObservable->subscribe(*mappingObserver);
//TBD return subscription object of the mapping-observable or of "this"???
}
//this method implements Subscription::unsubscribe
void unsubscribe()
{
//TBD!?
//clear all
this->values = nullptr;
this->valuesCount = 0;
this->err = nullptr;
}
public:
//factory function to construct a observable that emits a single value
static Observable * of(V value) //call by value
{
Observable * thiz = new Observable();
thiz->subscribeHandler = &Observable::subscribeHandler_of;
thiz->value = value; //store a copy the value
return thiz;
}
//factory function to construct a observable that emits a series of values
static Observable * from(V const * values, size_t count) //values is pointer to (array of) const V(s)
{
Observable * thiz = new Observable();
thiz->subscribeHandler = &Observable::subscribeHandler_from;
thiz->values = values; //store pointer to the values
thiz->valuesCount = count;
return thiz;
}
//factory function to construct a observable that emits an error
static Observable * throwError(E err) //call by value
{
Observable * thiz = new Observable();
thiz->subscribeHandler = &Observable::subscribeHandler_throwError;
thiz->err = err; //make a copy
return thiz;
}
//create a new Observable, that emits the "next-values" of this stream transformed by the given transformation function
Observable * map(MappingObserver<V,E> & mappingObserver)
{
Observable * newobs = new Observable();
newobs->subscribeHandler = &Observable::subscribeHandler_map;
newobs->mappingObserver = &mappingObserver;
newobs->mappingObservable = this;
return newobs;
}
//this method is a wrapper to call the respective subscribe handler method, set at construction
Subscription * subscribe(Observer<V,E> & observer)
{
//if the observable hasn't completed yet...
if (this->subscribeHandler != nullptr)
{
//use c++ function-/method-pointer to...
return (this->*subscribeHandler)(&observer); //...call either subscribeHandler_of/.._from/.._throwError
}
//otherwise: the observable has already completed
//subscription has no further effect, than just returning an Subscription object
return this; //as Observable derives from Subscription it is very easy at this point to return an Subscription object
}
};
/* -- (Module) Global Variables ------------------------------------------- */
/* -- Implementation ------------------------------------------------------ */
//demo of an "Integer-Observer" (that takes integers and notifies an character-string in case of error)
class IntObserver : public Observer<int, char const *>
{
private:
string id;
public:
IntObserver(string id)
{
this->id = id;
}
void next(int const & value)
{
cout << id << ": " << value << endl;
}
void error(char const * const & err)
{
cout << id << ": " << err << endl;
}
void complete()
{
cout << id << ": complete!" << endl;
}
};
//demo of an observer, that maps values and forwards everything the to "actual subscriber"
class IntMapObserver : public MappingObserver<int, char const *>
{
public:
IntMapObserver()
{
forward = true;
}
void next(int const & value)
{
if (forward) //forward only every seconde value!!!
{
this->observer->next(2 * value); //double value and forward the the subscriber
}
forward = !forward; //toggle
}
void error(char const * const & err)
{
this->observer->error(err); //forward (unmodifed) error to subscriber
}
void complete()
{
this->observer->complete(); //forward complete to subscriber
}
private:
bool forward;
};
//typedef of an "Integer-Observerable" (that takes integers and notifies an character-string in case of error)
typedef Observable<int, const char *> IntObservable;
int main(int argc, char * argv[])
{
IntObserver myIntObserver("IntObs");
Subscription * mySubscription;
cout << "--------------- TEST CASE 'of' ---------------" << endl;
cout << "Creating a Single-Integer-Observable, that emits a single value before it completes." << endl;
IntObservable * singleIntObservable = IntObservable::of(1);
cout << "Now I am going to subscribe to the Single-Integer-Observable." << endl;
mySubscription = singleIntObservable->subscribe(myIntObserver);
cout << "OK, just for testing: I am going to subscribe to the Single-Integer-Observable, a second time..." << endl;
cout << " But normally nothing should happen any more, as the observable should already be completed!" << endl;
mySubscription = singleIntObservable->subscribe(myIntObserver);
cout << "Now I am going to unsubscribe from the Single-Integer-Observable." << endl;
mySubscription->unsubscribe();
cout << endl;
cout << "--------------- TEST CASE 'from' ---------------" << endl;
cout << "Creating a Integer-Series-Observable, that emits a series of integer values before it completes." << endl;
int series[] = { 1, -2, 3, -4, 5, -6, 7 };
IntObservable * intSeriesObservable = IntObservable::from(series, 7);
cout << "Now I am going to subscribe to the Integer-Series-Observable." << endl;
mySubscription = intSeriesObservable->subscribe(myIntObserver);
cout << "OK, just for testing: I am going to subscribe to the Integer-Series-Observable, a second time..." << endl;
cout << " But normally nothing should happen any more, as the observable should already be completed!" << endl;
mySubscription = intSeriesObservable->subscribe(myIntObserver);
cout << "Now I am going to unsubscribe from that Integer-Series-Observable." << endl;
mySubscription->unsubscribe();
cout << endl;
cout << "--------------- TEST CASE 'map' ---------------" << endl;
cout << "Creating a Integer-Series-Observable, that emits a series of integer values before it completes." << endl;
intSeriesObservable = IntObservable::from(series, 7);
cout << "Map that Observable to another Observable by means of an inermediate mapping observer." << endl;
cout << "The mapping observable forwards only every seconde value. The forwarded value will be doubled." << endl;
IntMapObserver myMappingObserver;
IntObservable * mappedSeriesObservable = intSeriesObservable->map(myMappingObserver);
cout << "Now I am going to subscribe to the Mapped-Series-Observable." << endl;
mySubscription = mappedSeriesObservable->subscribe(myIntObserver);
cout << "OK, just for testing: I am going to subscribe to the Mapped-Series-Observable, a second time..." << endl;
cout << " But normally nothing should happen any more, as the observable should already be completed!" << endl;
mySubscription = mappedSeriesObservable->subscribe(myIntObserver);
cout << "Now I am going to unsubscribe from that Integer-Series-Observable." << endl;
mySubscription->unsubscribe();
cout << endl;
cout << "--------------- TEST CASE 'throwError' ---------------" << endl;
cout << "Creating a Error-Observable, that emits an error text (c-string) before it completes." << endl;
IntObservable * errorObservable = IntObservable::throwError("An error occured!");
cout << "Now I am going to subscribe to the Error-Observable." << endl;
mySubscription = errorObservable->subscribe(myIntObserver);
cout << "OK, just for testing: I am going to subscribe to the Error-Observable, a second time..." << endl;
cout << " But normally nothing should happen any more, as the observable should already be completed!" << endl;
mySubscription = errorObservable->subscribe(myIntObserver);
cout << "Now I am going to unsubscribe from that Error-Observable." << endl;
mySubscription->unsubscribe();
cout << endl;
cout << endl << "---END---" << endl;
return 0;
}