-
Notifications
You must be signed in to change notification settings - Fork 0
/
mq.py
45 lines (30 loc) · 1.3 KB
/
mq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from queue import SimpleQueue
from schema import Tick
class AbstractQueue:
def __init__(self, max_size):
raise NotImplementedError(f"{self.__class__.__name__} has not implemented __init__()")
def enqueue(self, item):
raise NotImplementedError(f"{self.__class__.__name__} has not implemented enqueue")
def dequeue(self):
raise NotImplementedError(f"{self.__class__.__name__} has not implemented dequeue")
def isEmpty(self):
raise NotImplementedError(f"{self.__class__.__name__} has not implemented isEmpty")
def isFull(self):
raise NotImplementedError(f"{self.__class__.__name__} has not implemented isFull")
class DefaultQueue(AbstractQueue):
def __init__(self, max_size=100, timeout=60):
assert isinstance(max_size, int) and isinstance(timeout, int)
self.Q = SimpleQueue()
self.timeout = timeout
def enqueue(self, item):
return self.Q.put(item, timeout=self.timeout)
def dequeue(self):
return self.Q.get(timeout=self.timeout)
def isEmpty(self):
return self.Q.empty()
def isFull(self):
return self.Q.full()
class TickQueue(DefaultQueue):
def enqueue(self, item):
assert isinstance(item,Tick)
return super().enqueue(item)