-
Notifications
You must be signed in to change notification settings - Fork 3
/
battleEventSession.py
100 lines (81 loc) · 2.71 KB
/
battleEventSession.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import BigWorld
from events import Event, OnBattleStart, OnBattleResult
from ..common.asyncResponse import post_async
from ..common.exceptionSending import with_exception_sending
from ..utils import print_log, print_debug
try:
from ..common.crypto import encrypt
print_log('import crypto')
except:
from ..common.cryptoPlaceholder import encrypt
print_log('import cryptoPlaceholder')
class BattleEventSession:
send_queue = []
token = None
initURL = ''
eventURL = ''
send_interval = 5
arenaID = None
enable = False
def __init__(self, event_URL, init_URL, on_end_load_event, sendInterval=5):
# type: (str, str, OnBattleStart, float) -> None
self.send_queue = []
self.token = None
self.initURL = init_URL
self.eventURL = event_URL
self.send_interval = sendInterval
self.arenaID = on_end_load_event.arenaID
self.enable = False
data = json.dumps(on_end_load_event.get_dict())
print_debug(data)
post_async(self.initURL, encrypt(data), self.__init_send_callback)
def add_event(self, event):
# type: (Event) -> None
self.send_queue.append(event)
def end_event_session(self, battle_result_event):
# type: (OnBattleResult) -> None
self.add_event(battle_result_event)
self.enable = False
def __init_send_callback(self, res):
# type: (str) -> None
self.token = res
print_log('setToken: ' + str(res))
if not self.enable:
self.enable = True
self.__send_event_loop()
def __send_event_loop(self):
for event in self.send_queue:
event.token = self.token
self.__post_events(self.send_queue)
self.send_queue = []
if self.enable:
BigWorld.callback(self.send_interval, self.__send_event_loop)
@with_exception_sending
def __post_events(self, events, callback=None):
if events and len(events) > 0:
data = {
'events': map(lambda t: t.get_dict(), events)
}
print_log(json.dumps(data))
post_async(self.eventURL, encrypt(json.dumps(data)), callback)
class HangarEventSession:
def __init__(self, hangar_event_URL, sendInterval=5):
self.send_queue = []
self.eventURL = hangar_event_URL
self.send_interval = sendInterval
self.__send_event_loop()
def add_event(self, event):
self.send_queue.append(event)
def __send_event_loop(self):
BigWorld.callback(self.send_interval, self.__send_event_loop)
self.__post_events(self.send_queue)
self.send_queue = []
@with_exception_sending
def __post_events(self, events, callback=None):
if events and len(events) > 0:
data = {
'events': map(lambda t: t.get_dict(), events)
}
print_debug(json.dumps(data))
post_async(self.eventURL, encrypt(json.dumps(data)), callback)