-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.js
executable file
·124 lines (108 loc) · 3.13 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
const redis = require('redis');
const bluebird = require('bluebird');
bluebird.promisifyAll(redis);
let client = redis.createClient();
// client.getAsync('name').then(function(res) {
// console.log(res); // => 'bar'
// });
/* client.xaddAsync('mystream','*','age',23).then(function(res){
console.log(res);
}); */
/*
client.xrangeAsync('mystream','-','+').then(function(res){
res.forEach(element => {
element.forEach((element,index)=>{
if(index === 0){
console.log(`Timestamp: ${element}`);
}else{
element.forEach(element=>{
console.log(element)
})
}
})
});
});
*/
/* client.XGROUPAsync('DESTROY','mystream','consumer1').then(res=>{
console.log(res);
})
client.XGROUPAsync('CREATE','mystream','consumer1','0').then(res=>{
console.log(res);
})
client.XINFOAsync('GROUPS','mystream').then(res=>{
console.log(res)
})*/
/*
client.xreadAsync('count',1,'Streams','mystream',0-0).then(res=>{
res.forEach(element => {
element.forEach((element,index)=>{
if(index === 0){
console.log(`Stream Name: ${element}`);
}else{
element.forEach((element,index)=>{
element.forEach((element,index)=>{
if(index === 0){
console.log(`Timestamp: ${element}`);
}else if(index === 1){
element.forEach(element=>{
console.log(element)
})
}
})
})
}
})
});
})
client.onAsync('connect',()=>{
console.log('Server started......')
});
client.onAsync('error',(err)=>{
console.log(`Something went wrong ${err}`)
});
client.quit(); */
//random number generation and insertion into stream
async function genRnd(){
for(let x = 0; x < 10;){
await client.xaddAsync('mystream','MAXLEN','~','1000','*','rnd',Math.floor(Math.random()*10000)).then(res=>{
//console.log(res);
});
}
}
//Iterating over array retrieved from redis
function* myIterator(val){
for(let i = 0; i < val.length; i++){
yield val[i];
}
};
//reading data from redis stream
const FeedsIterator = function ({name = "mystream", blockMs = 0, check = '0-0'}) {
let instructions = ["count","20",'STREAMS',name, check];
if (!!blockMs) {
instructions = ["BLOCK", `${blockMs}`, ...instructions];
}
return client
.xreadAsync(...instructions)
.then(res => {
let [[,stream] = []] = res
return stream;
})
}
//FeedsIterator({});
const execute = async function(check = '0-0',blockMs = '20000') {
for (let streams = await FeedsIterator({blockMs,check}); streams.length > 0;) {
const myStreams = myIterator(streams);
let {value, done} = myStreams.next();
//console.log(value);
while(!done) {
check = value[0];
let[,[,data] = []] = value;
console.log(`Id: ${check}`);
console.log(`Data: ${data}`);
({value, done} = myStreams.next());
}
streams = await FeedsIterator({check})
}
}
genRnd();
execute();