forked from SvenskaSpel/locust-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
socketio_ex.py
28 lines (20 loc) · 837 Bytes
/
socketio_ex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import json
from locust import task
from locust_plugins.users.socketio import SocketIOUser
class MySocketIOUser(SocketIOUser):
@task
def my_task(self):
self.my_value = None
self.connect("wss://example.com/socket.io/?EIO=3&transport=websocket")
# example of subscribe
self.send('42["subscribe",{"url":"/sport/matches/11995208/draws","sendInitialUpdate": true}]')
# wait until I get a push message to on_message
while not self.my_value:
time.sleep(0.1)
# wait for additional pushes, while occasionally sending heartbeats, like a real client would
self.sleep_with_heartbeat(10)
def on_message(self, message):
self.my_value = json.loads(message)["my_value"]
if __name__ == "__main__":
host = "http://example.com"