-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_dynamo.py
95 lines (78 loc) · 3.19 KB
/
client_dynamo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
This file
"""
import time
import grpc
from dynamo_pb2_grpc import DynamoInterfaceStub
from dynamo_pb2 import GetRequest, GetResponse, PutRequest, PutResponse, VectorClock, VectorClockItem, NoParams, FailRequest
import logging
logger = logging.getLogger('dynamo_client')
logger.setLevel(logging.INFO)
def bidirectional_get(stub, client_id):
"""
The stub contains a method which can give the client a response iterator
it can use to get the responses from the server. In tuern it has to send
a request iterator to the server, so that it can iterate through all the requests.
"""
raise NotImplementedError
def get(stub, client_id, key):
"""
Regular get request
"""
request = GetRequest(client_id=client_id, key=key, hinted_handoff=-1)
response : GetResponse = stub.Get(request)
logging.info(f"Get Response recieved from {response.server_id}")
return response
def put(stub, request: PutRequest):
"""
Regular put request
"""
response : PutResponse = stub.Put(request)
logging.info(f"Put Response recieved from {response.server_id}")
return response
def client_get(port, client_id, key=1):
logging.info("-------------Sending GET request !!!--------------")
with grpc.insecure_channel(f"localhost:{port}") as channel:
stub = DynamoInterfaceStub(channel)
response = get(stub, client_id, key)
if response.reroute == True:
# sending to actual coordinator node
with grpc.insecure_channel(f"localhost:{response.reroute_server_id}") as channel:
stub = DynamoInterfaceStub(channel)
response = get(stub, client_id, key)
return response
def client_put(port, client_id, key=1, val="1", context=None):
# item = VectorClockItem(server_id=1, count=1)
if context is None:
context = VectorClock(clock=[]) # An existing context only needs to be passed when updating an existing key's value
request = PutRequest(client_id=client_id, key=key, val=val, context=context, hinted_handoff=-1)
with grpc.insecure_channel(f"localhost:{port}") as channel:
stub = DynamoInterfaceStub(channel)
response = put(stub, request)
if response.reroute == True:
# sending it to actual coordinator node
with grpc.insecure_channel(f"localhost:{response.reroute_server_id}") as channel:
stub = DynamoInterfaceStub(channel)
response = put(stub, request)
return response
def client_get_memory(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
stub = DynamoInterfaceStub(channel)
request = NoParams()
response = stub.PrintMemory(request)
return response.mem, response.mem_replicated
def client_fail(port, fail=True):
with grpc.insecure_channel(f"localhost:{port}") as channel:
stub = DynamoInterfaceStub(channel)
request = FailRequest(fail=fail)
response = stub.Fail(request)
def client_gossip(port):
"""
Turn gossip protocol on for this node
"""
with grpc.insecure_channel(f"localhost:{port}") as channel:
stub = DynamoInterfaceStub(channel)
request = NoParams()
response = stub.Gossip(request)
return response
# client_put(2333, 1)