-
Notifications
You must be signed in to change notification settings - Fork 98
/
BlynkTimer.py
129 lines (104 loc) · 3.32 KB
/
BlynkTimer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
BlynkTimer
blynk-library-python
Module for launching timed Blynk actions
with polling
"""
import time
class BlynkTimer:
'''Executes functions after a defined period of time'''
_MAX_TIMERS = 16
def __init__(self):
self.timers = []
self.ids = self._get_unique_id()
def _get_unique_id(self, current=0):
'''yields unique id for new timer'''
numId = current
while numId < self._MAX_TIMERS:
yield numId
numId += 1
def _add(self, func, **kwargs):
'''Inits Timer'''
timerId = next(self.ids)
timer = Timer(timerId, func, **kwargs)
self.timers.append(timer)
return timer
def _get(self, timerId):
'''Gets timer by id'''
timer = [t for t in self.timers if t.id == timerId]
if len(timer) <= 0:
return None
return timer[0]
def _delete(self, timerId):
'''Deletes timer'''
timer = self._get(timerId)
timer.disable()
self.timers = [t for t in self.timers if t.id != timerId]
num_timers = self.get_num_timers()[0]
self.ids = self._get_unique_id(current=num_timers)
return timerId
def get_num_timers(self):
'''Returns number of used timer slots'''
num_timers = len(self.timers)
return (num_timers, self._MAX_TIMERS)
def is_enabled(self, timerId):
'''Returns true if timer is enabled'''
timer = self._get(timerId)
return timer.enabled
def set_interval(self, value, func):
'''Sets time interval for function'''
timer = self._add(func)
timer.set_interval(value)
return timer.id
def set_timeout(self, value, func):
'''Runs function once after timeout'''
timer = self._add(func, post_run=self._delete)
timer.set_interval(value)
return timer.id
def enable(self, timerId):
'''Enables timer'''
timer = self._get(timerId)
timer.enable()
return timerId
def disable(self, timerId):
'''Disables timer'''
timer = self._get(timerId)
timer.disable()
return timerId
def run(self):
'''Polls timers'''
[t.run() for t in self.timers]
class Timer:
'''Runs function after specific interval'''
def __init__(self, id, func, **kwargs):
self.id = id
self.func = func
self.interval = None
self.start_time = None
self.enabled = False
self.on_post_run = kwargs.get('post_run', None)
def _handle_post_run(self):
'''handles post run events'''
self.start_time += self.interval
if self.on_post_run:
return self.on_post_run(self.id)
def enable(self):
'''enables Timer'''
self.enabled = True
self.start_time = time.time()
def disable(self):
'''disables timer'''
self.enabled = False
self.start_time = None
def set_interval(self, value):
'''Sets Time Interval for calling function'''
self.interval = value
self.enable()
def run(self):
'''Runs function if interval has passed'''
if not self.enabled:
return
now = time.time()
if now - self.start_time > self.interval:
self.func()
self._handle_post_run()