Skip to content

Commit ff6184f

Browse files
authored
Merge pull request #112 from knix-microfunctions/release/0.8.9
Release/0.8.9
2 parents 8287a3b + 9549933 commit ff6184f

File tree

101 files changed

+2180
-1429
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+2180
-1429
lines changed

DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1728,7 +1728,7 @@ else if(sys.containsKey(key))
17281728
InetAddress la = InetAddress.getByName(addr[0]);
17291729
Socket s = new Socket(la, port);
17301730
s.close();
1731-
riakNodes.put(la.getHostAddress(), port);
1731+
riakNodes.put(la.getCanonicalHostName(), port);
17321732
System.out.println("Using riak node: "+la.getHostAddress()+":"+port.toString());
17331733
break;
17341734
} catch (UnknownHostException e) {

FunctionWorker/MicroFunctionsAPI.thrift

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,25 @@
1515
*/
1616
namespace java org.microfunctions.mfnapi
1717

18+
struct TriggerAPIResult {
19+
1: bool success
20+
2: string message
21+
}
22+
23+
struct TriggerInfoAMQP {
24+
1: string amqp_addr
25+
2: string routing_key
26+
3: string exchange
27+
4: bool with_ack
28+
5: bool durable
29+
6: bool exclusive
30+
7: double ignr_msg_prob
31+
}
32+
33+
struct TriggerInfoTimer {
34+
1: i64 timerIntervalMilliseconds
35+
}
36+
1837
service MicroFunctionsAPIService {
1938

2039
string get_context_object_properties(),
@@ -23,10 +42,10 @@ service MicroFunctionsAPIService {
2342

2443
void update_metadata(1: string metadata_name, 2: string metadata_value, 3: bool is_privileged_metadata),
2544

26-
void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now), // message?
27-
void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now), // message
28-
void send_to_all_running_functions_in_session(1: string message, 2: bool send_now), //message
29-
void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now), // message
45+
void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now),
46+
void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now),
47+
void send_to_all_running_functions_in_session(1: string message, 2: bool send_now),
48+
void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now),
3049

3150
list<string> get_session_update_messages(1: i32 count, 2: bool blck),
3251

@@ -46,11 +65,11 @@ service MicroFunctionsAPIService {
4665

4766
bool is_still_running(),
4867

49-
void add_workflow_next(1: string nxt, 2: string value), // value
50-
void add_dynamic_next(1: string nxt, 2: string value), // value
51-
void send_to_function_now(1: string destination, 2: string value), // value
52-
void add_dynamic_workflow(1: list<map<string, string>> dynamic_trigger), // dynamic_trigger
53-
list<map<string, string>> get_dynamic_workflow(), // return value
68+
void add_workflow_next(1: string nxt, 2: string value),
69+
void add_dynamic_next(1: string nxt, 2: string value),
70+
void send_to_function_now(1: string destination, 2: string value),
71+
void add_dynamic_workflow(1: list<map<string, string>> dynamic_trigger),
72+
list<map<string, string>> get_dynamic_workflow(),
5473

5574
i64 get_remaining_time_in_millis(),
5675
void log(1: string text, 2: string level),
@@ -89,7 +108,19 @@ service MicroFunctionsAPIService {
89108
void deleteCounter(1: string countername, 2: bool is_private, 3: bool is_queued),
90109
list<string> getCounterNames(1: i32 start_index, 2: i32 end_index, 3: bool is_private),
91110

111+
bool addTriggerableBucket(1: string bucket_name),
112+
bool addStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name),
113+
bool deleteTriggerableBucket(1: string bucket_name),
114+
bool deleteStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name),
115+
116+
TriggerAPIResult addTriggerAMQP(1: string trigger_name, 2: TriggerInfoAMQP trigger_info),
117+
TriggerAPIResult addTriggerTimer(1: string trigger_name, 2: TriggerInfoTimer trigger_info),
118+
TriggerAPIResult addTriggerForWorkflow(1: string trigger_name, 2: string workflow_name, 3: string workflow_state),
119+
TriggerAPIResult deleteTriggerForWorkflow(1: string trigger_name, 2: string workflow_name),
120+
TriggerAPIResult deleteTrigger(1: string trigger_name),
121+
92122
map<string, string> get_transient_data_output(1: bool is_private),
93123
map<string, bool> get_data_to_be_deleted(1: bool is_private)
124+
94125
}
95126

FunctionWorker/python/DataLayerClient.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from data_layer.message.ttypes import Metadata
2424
from data_layer.service import DataLayerService
2525

26-
MAX_RETRIES=3
26+
MAX_RETRIES = 3
2727

2828
class DataLayerClient:
2929

@@ -50,7 +50,7 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
5050
self.countertriggerstable = "counterTriggersTable"
5151
self.countertriggersinfotable = "counterTriggersInfoTable"
5252

53-
else:
53+
elif suid is not None:
5454
self.keyspace = "storage_" + suid
5555
if tableName is not None:
5656
self.tablename = tableName
@@ -62,6 +62,9 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
6262
self.triggersinfotablename = "triggersInfoTable"
6363
self.countertriggerstable = "counterTriggersTable"
6464
self.countertriggersinfotable = "counterTriggersInfoTable"
65+
else:
66+
print("[DataLayerClient]: Error in initializing; no required values given.")
67+
return
6568

6669
#print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename))
6770
self.locality = locality
@@ -447,17 +450,25 @@ def deleteSet(self, setname):
447450

448451
def getSetNames(self, start_index=0, end_index=2147483647):
449452
sets = []
453+
set_response = []
450454
for retry in range(MAX_RETRIES):
451455
try:
452456
sets = self.datalayer.selectSets(self.keyspace, self.settablename, start_index, end_index, self.locality)
457+
if sets is not None or isinstance(sets, list):
458+
for name in sets:
459+
if name.endswith("_outputkeys_set"):
460+
continue
461+
else:
462+
set_response.append(name)
463+
453464
break
454465
except TTransport.TTransportException as exc:
455466
print("[DataLayerClient] Reconnecting because of failed getSetNames: " + str(exc))
456467
self.connect()
457468
except Exception as exc:
458469
print("[DataLayerClient] failed getSetNames: " + str(exc))
459470
raise
460-
return sets
471+
return set_response
461472

462473
# counter operations
463474
def createCounter(self, countername, count, tableName=None):

FunctionWorker/python/FunctionWorker.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,18 +338,18 @@ def _fork_and_handle_message(self, key, encapsulated_value):
338338
if not has_error:
339339
try:
340340
if "__state_action" not in metadata or (metadata["__state_action"] != "post_map_processing" and metadata["__state_action"] != "post_parallel_processing"):
341-
#self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input))
342-
function_input = self._state_utils.applyInputPath(raw_state_input)
343-
#self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input))
344-
function_input = self._state_utils.applyParameters(function_input)
345-
#self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input))
346-
function_input = self._state_utils.applyItemsPath(function_input) # process map items path
341+
#self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input))
342+
function_input = self._state_utils.applyInputPath(raw_state_input)
343+
#self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input))
344+
function_input = self._state_utils.applyParameters(function_input)
345+
#self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input))
346+
function_input = self._state_utils.applyItemsPath(function_input) # process map items path
347347

348348
#elif "Action" not in metadata or metadata["Action"] != "post_parallel_processing":
349349
# function_input = self._state_utils.applyInputPath(raw_state_input)
350350

351351
else:
352-
function_input = raw_state_input
352+
function_input = raw_state_input
353353
except Exception as exc:
354354
self._logger.exception("InputPath processing exception: %s\n%s", str(instance_pid), str(exc))
355355
error_type = "InputPath processing exception"
@@ -728,4 +728,3 @@ def main():
728728

729729
if __name__ == '__main__':
730730
main()
731-

FunctionWorker/python/LocalQueueClient.py

Lines changed: 32 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@
1515
import time
1616
import socket
1717

18-
from thrift import Thrift
19-
from thrift.transport import TSocket
20-
from thrift.transport import TTransport
21-
from thrift.protocol import TCompactProtocol
22-
23-
from local_queue.service import LocalQueueService
24-
from local_queue.service.ttypes import LocalQueueMessage
18+
import redis
2519

2620
class LocalQueueClient:
2721
'''
@@ -32,93 +26,73 @@ class LocalQueueClient:
3226
3327
'''
3428
def __init__(self, connect="127.0.0.1:4999"):
35-
self.qaddress = connect
29+
self._qaddress = connect
3630
self.connect()
3731

3832
def connect(self):
39-
host, port = self.qaddress.split(':')
40-
retry = 0.5 #s
4133
while True:
4234
try:
43-
self.socket = TSocket.TSocket(host, int(port))
44-
self.transport = TTransport.TFramedTransport(self.socket)
45-
self.protocol = TCompactProtocol.TCompactProtocol(self.transport)
46-
self.queue = LocalQueueService.Client(self.protocol)
47-
self.transport.open()
35+
self._queue = redis.Redis.from_url("redis://" + self._qaddress, decode_responses=True)
4836
break
49-
except Thrift.TException as exc:
37+
except Exception as exc:
5038
if retry < 60:
51-
print("[LocalQueueClient] Could not connect due to "+str(exc)+", retrying in "+str(retry)+"s")
39+
print("[LocalQueueClient] Could not connect due to " + str(exc) + ", retrying in " + str(retry) + "s")
5240
time.sleep(retry)
5341
retry = retry * 2
5442
else:
5543
raise
5644

57-
def addMessage(self, topic, lqcm, ack):
45+
def addMessage(self, topic, message, ack):
5846
status = True
59-
message = LocalQueueMessage()
60-
message.payload = lqcm.get_serialized().encode()
6147
try:
6248
if ack:
63-
status = self.queue.addMessage(topic, message)
49+
status = bool(self._queue.xadd(topic, message.get_message()))
6450
else:
65-
self.queue.addMessageNoack(topic, message)
66-
except TTransport.TTransportException as exc:
51+
self._queue.xadd(topic, message.get_message())
52+
except Exception as exc:
6753
print("[LocalQueueClient] Reconnecting because of failed addMessage: " + str(exc))
6854
status = False
6955
self.connect()
70-
except Exception as exc:
71-
print("[LocalQueueClient] failed addMessage: " + str(exc))
72-
raise
7356

7457
return status
7558

7659
def getMessage(self, topic, timeout):
60+
message = None
7761
try:
78-
lqm = self.queue.getAndRemoveMessage(topic, timeout)
79-
if lqm.index != 0:
80-
return lqm
81-
except TTransport.TTransportException as exc:
62+
message_list = self._queue.xread({topic: 0}, block=timeout, count=1)
63+
if message_list:
64+
message = message_list[0][1][0][1]
65+
# remove the message from the topic
66+
msg_id = message_list[0][1][0][0]
67+
self._queue.xdel(topic, msg_id)
68+
except Exception as exc:
8269
print("[LocalQueueClient] Reconnecting because of failed getMessage: " + str(exc))
8370
self.connect()
84-
except Exception as exc:
85-
print("[LocalQueueClient] failed getMessage: " + str(exc))
86-
raise
8771

88-
return None
72+
return message
8973

9074
def getMultipleMessages(self, topic, max_count, timeout):
75+
message_list = []
9176
try:
92-
lqm_list = self.queue.getAndRemoveMultiMessages(topic, max_count, timeout)
93-
except TTransport.TTransportException as exc:
77+
message_list = self._queue.xread({topic: "0"}, block=timeout, count=max_count)
78+
except Exception as exc:
9479
print("[LocalQueueClient] Reconnecting because of failed getMultipleMessages: " + str(exc))
95-
lqm_list = []
9680
self.connect()
97-
except Exception as exc:
98-
print("[LocalQueueClient] failed getMultipleMessages: " + str(exc))
99-
raise
10081

101-
return lqm_list
82+
msg_list = []
83+
for msg in message_list[0][1]:
84+
msg_list.append(msg[1])
85+
# remove the message from the topic
86+
self._queue.xdel(topic, msg[0])
87+
88+
return msg_list
10289

10390
def shutdown(self):
104-
if self.transport.isOpen():
105-
#self.socket.handle.shutdown(socket.SHUT_RDWR)
106-
self.transport.close()
91+
self._queue.close()
10792

10893
def addTopic(self, topic):
109-
try:
110-
self.queue.addTopic(topic)
111-
except Thrift.TException as exc:
112-
print("[LocalQueueClient] failed addTopic: " + str(exc))
113-
except Exception as exc:
114-
print("[LocalQueueClient] failed addTopic: " + str(exc))
115-
raise
94+
# no op with regular streams
95+
return
11696

11797
def removeTopic(self, topic):
118-
try:
119-
self.queue.removeTopic(topic)
120-
except Thrift.TException as exc:
121-
print("[LocalQueueClient] failed removeTopic: " + str(exc))
122-
except Exception as exc:
123-
print("[LocalQueueClient] failed removeTopic: " + str(exc))
124-
raise
98+
self._queue.xtrim(topic, "0", approximate=False)

FunctionWorker/python/LocalQueueClientMessage.py

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,42 +12,25 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from struct import pack, unpack
16-
1715
class LocalQueueClientMessage:
18-
'''
19-
This class defines the message data structure used by the function worker.
20-
It provides the utilities to convert the local queue message
21-
and make the key and value fields easily accessible.
22-
'''
16+
2317
def __init__(self, lqm=None, key=None, value=None):
2418
if lqm is None and key is None and value is None:
2519
return
2620
elif lqm is not None:
27-
self._serialized = lqm.payload
28-
self._deserialize()
21+
self._message = lqm
22+
self._key = self._message["key"]
23+
self._value = self._message["value"]
2924
elif key is not None and value is not None:
3025
self._key = key
3126
self._value = value
32-
self._serialize()
33-
34-
def _serialize(self):
35-
length = 4 + len(self._key)
36-
self._serialized = pack('!I', length)
37-
self._serialized = self._serialized + self._key.encode() + self._value.encode()
38-
self._serialized = self._serialized.decode()
39-
40-
def _deserialize(self):
41-
length = unpack('!I', self._serialized[0:4])[0]
42-
self._key = self._serialized[4:length].decode()
43-
self._value = self._serialized[length:].decode()
27+
self._message = {"key": self._key, "value": self._value}
4428

4529
def get_key(self):
4630
return self._key
4731

4832
def get_value(self):
4933
return self._value
5034

51-
def get_serialized(self):
52-
return self._serialized
53-
35+
def get_message(self):
36+
return self._message

0 commit comments

Comments
 (0)