Skip to content

Commit a434a1a

Browse files
authored
Merge pull request #216 from fabric-testbed/multi_threaded
Performance Improvements and more
2 parents 39f69af + 8edad0b commit a434a1a

File tree

77 files changed

+1734
-1093
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1734
-1093
lines changed

Dockerfile-auth

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
FROM python:3.9.0
22
MAINTAINER Komal Thareja<[email protected]>
33

4-
ARG HANDLERS_VER=1.3.1
4+
ARG HANDLERS_VER=1.3.2
55

66
RUN mkdir -p /usr/src/app
77
WORKDIR /usr/src/app
88
VOLUME ["/usr/src/app"]
99

1010
EXPOSE 11000
1111

12+
RUN apt-get update
13+
RUN apt-get install cron -y
14+
1215
COPY requirements.txt /usr/src/app/
16+
COPY docker-entrypoint.sh /usr/src/app/
1317
COPY fabric_cf /usr/src/app/fabric_cf
18+
COPY tools/cleanup.py /usr/src/app/fabric_cf/
19+
COPY tools/install.sh /usr/src/app/fabric_cf/
20+
1421
RUN pip3 install --no-cache-dir -r requirements.txt
1522
RUN mkdir -p "/etc/fabric/message_bus/schema"
1623
RUN mkdir -p "/etc/fabric/actor/config"
1724
RUN mkdir -p "/var/log/actor"
1825
RUN cp /usr/local/lib/python3.9/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
1926
RUN pip3 install fabric-am-handlers==${HANDLERS_VER}
27+
RUN sh /usr/src/app/fabric_cf/install.sh
2028

21-
ENTRYPOINT ["python3"]
22-
CMD ["-m", "fabric_cf.authority"]
29+
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
30+
CMD ["fabric_cf.authority"]

Dockerfile-broker

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,21 @@ VOLUME ["/usr/src/app"]
77

88
EXPOSE 11000
99

10+
RUN apt-get update
11+
RUN apt-get install cron -y
12+
1013
COPY requirements.txt /usr/src/app/
14+
COPY docker-entrypoint.sh /usr/src/app/
1115
COPY fabric_cf /usr/src/app/fabric_cf
16+
COPY tools/cleanup.py /usr/src/app/fabric_cf/
17+
COPY tools/install.sh /usr/src/app/fabric_cf/
18+
1219
RUN pip3 install --no-cache-dir -r requirements.txt
1320
RUN mkdir -p "/etc/fabric/message_bus/schema"
1421
RUN mkdir -p "/etc/fabric/actor/config"
1522
RUN mkdir -p "/var/log/actor"
1623
RUN cp /usr/local/lib/python3.9/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
24+
RUN sh /usr/src/app/fabric_cf/install.sh
1725

18-
ENTRYPOINT ["python3"]
19-
20-
CMD ["-m", "fabric_cf.broker"]
26+
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
27+
CMD ["fabric_cf.broker"]

Dockerfile-cf

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,23 @@ VOLUME ["/usr/src/app"]
77

88
EXPOSE 11000
99

10+
RUN apt-get update
11+
RUN apt-get install cron -y
12+
1013
COPY requirements.txt /usr/src/app/
1114
COPY fabric_cf /usr/src/app/fabric_cf
15+
COPY tools/cleanup.py /usr/src/app/fabric_cf/
16+
COPY tools/install.sh /usr/src/app/fabric_cf/
17+
1218
RUN pip3 install --no-cache-dir -r requirements.txt
1319
RUN mkdir -p "/etc/fabric/message_bus/schema"
1420
RUN mkdir -p "/etc/fabric/actor/config"
1521
RUN mkdir -p "/var/log/actor"
1622
RUN cp /usr/local/lib/python3.9/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
1723

18-
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
24+
RUN echo "0 2 * * * root /usr/local/bin/python3.9 /usr/src/app/cleanup.py -f /etc/fabric/actor/config/config.yaml -d 30 -c slices -o remove" >> /etc/crontab
25+
RUN service cron reload
26+
RUN service cron restart
27+
28+
ENTRYPOINT ["sh"]
29+
CMD ["tail", "-f", "/dev/null"]

Dockerfile-orchestrator

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@ VOLUME ["/usr/src/app"]
88
EXPOSE 11000
99
EXPOSE 8700
1010

11+
RUN apt-get update
12+
RUN apt-get install cron -y
13+
1114
COPY requirements.txt /usr/src/app/
15+
COPY docker-entrypoint.sh /usr/src/app/
1216
COPY fabric_cf /usr/src/app/fabric_cf
17+
COPY tools/cleanup.py /usr/src/app/fabric_cf/
18+
COPY tools/install.sh /usr/src/app/fabric_cf/
19+
1320
RUN pip3 install --no-cache-dir -r requirements.txt
1421
RUN mkdir -p "/etc/fabric/message_bus/schema"
1522
RUN mkdir -p "/etc/fabric/actor/config"
1623
RUN mkdir -p "/var/log/actor"
1724
RUN cp /usr/local/lib/python3.9/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
25+
RUN sh /usr/src/app/fabric_cf/install.sh
1826

19-
20-
ENTRYPOINT ["python3"]
21-
22-
CMD ["-m", "fabric_cf.orchestrator"]
27+
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
28+
CMD ["fabric_cf.orchestrator"]

docker-entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#!/bin/sh
2-
tail -f /dev/null
2+
service cron start
3+
python3.9 -m $1

fabric_cf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__VERSION__ = "1.3.0"
1+
__VERSION__ = "1.3.1"

fabric_cf/actor/boot/configuration_processor.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,12 @@ def process(self):
9999
try:
100100
self.create_actor()
101101
self.initialize_actor()
102-
self.logger.info(f"There are {ActorMixin.actor_count} actors")
103102
self.register_actor()
104103
self.create_default_slice()
105104
self.populate_inventory_neo4j()
106105
self.recover_actor()
107106
self.enable_ticking()
108107
self.process_topology()
109-
self.logger.info(f"Processing exports with actor count {ActorMixin.actor_count}")
110108
self.process_advertise()
111109
self.logger.info("Processing exports completed")
112110
except Exception as e:
@@ -211,7 +209,7 @@ def make_actor_policy(self, *, actor: ABCActorMixin, config: ActorConfig):
211209
"""
212210
Creates Actor Policy instance
213211
@param actor actor
214-
@param actor_config actor config
212+
@param config actor config
215213
@raises ConfigurationException in case of error
216214
"""
217215
policy = None
@@ -349,7 +347,6 @@ def initialize_actor(self):
349347
@raises ConfigurationException in case of error
350348
"""
351349
try:
352-
ActorMixin.actor_count += 1
353350
self.actor.initialize()
354351
except Exception as e:
355352
raise ConfigurationException(f"Actor failed to initialize: {self.actor.get_name()} {e}")

fabric_cf/actor/core/apis/abc_actor_mixin.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,12 @@ def query(self, *, query: dict = None, caller: AuthToken = None,
300300
Returns:
301301
query response
302302
"""
303+
@abstractmethod
304+
def execute_on_actor_thread(self, *, runnable: ABCActorRunnable):
305+
"""
306+
Execute on Actor Thread and Wait until response is processed
307+
@params runnable: reservation to be processed
308+
"""
303309

304310
@abstractmethod
305311
def execute_on_actor_thread_and_wait(self, *, runnable: ABCActorRunnable):
@@ -545,6 +551,19 @@ def close_by_rid(self, *, rid: ID):
545551
Exception in case of error
546552
"""
547553

554+
@abstractmethod
555+
def close_delegation(self, *, did: str):
556+
"""
557+
Closes the delegation. Note: the delegation must have already been registered with the actor.
558+
This method may involve either a client or a server side action or both. When called on a broker,
559+
this method will only close the broker delegation.
560+
561+
Args:
562+
did: delegation id
563+
Raises:
564+
Exception in case of error
565+
"""
566+
548567
@abstractmethod
549568
def close(self, *, reservation: ABCReservationMixin):
550569
"""
@@ -682,3 +701,19 @@ def unregister(self, *, reservation: ABCReservationMixin, rid: ID):
682701
Raises:
683702
Exception in case of error
684703
"""
704+
705+
def get_asm_thread(self):
706+
return None
707+
708+
@abstractmethod
709+
def remove_delegation(self, *, did: str):
710+
"""
711+
Removes the specified delegation. Note: the delegation must have already been registered with the actor.
712+
This method will unregister the reservation and remove it from the underlying database.
713+
Only closed and failed delegation can be removed.
714+
715+
Args:
716+
did: delegation id
717+
Raises:
718+
Exception if an error occurs or when trying to remove a delegation that is neither failed or closed.
719+
"""

fabric_cf/actor/core/apis/abc_database.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from __future__ import annotations
2727

2828
from abc import abstractmethod, ABC
29+
from datetime import datetime
2930
from typing import TYPE_CHECKING, List
3031

3132
from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation
@@ -179,7 +180,7 @@ def get_client_reservations(self, *, slice_id: ID = None) -> List[ABCReservation
179180
@abstractmethod
180181
def get_slices(self, *, slice_id: ID = None, slice_name: str = None, project_id: str = None, email: str = None,
181182
state: list[int] = None, oidc_sub: str = None, slc_type: List[SliceTypes] = None,
182-
limit: int = None, offset: int = None) -> List[ABCSlice] or None:
183+
limit: int = None, offset: int = None, lease_end: datetime = None) -> List[ABCSlice] or None:
183184
"""
184185
Retrieves the specified slices.
185186
@@ -192,6 +193,7 @@ def get_slices(self, *, slice_id: ID = None, slice_name: str = None, project_id:
192193
@param slc_type slice type
193194
@param limit limit
194195
@param offset offset
196+
@param lease_end lease_end
195197
196198
@return list of slices
197199

fabric_cf/actor/core/apis/abc_delegation.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,3 +398,15 @@ def fail(self, *, message: str, exception: Exception = None):
398398
message: error message
399399
exception: exception
400400
"""
401+
402+
@abstractmethod
403+
def lock(self):
404+
"""
405+
Lock delegation
406+
"""
407+
408+
@abstractmethod
409+
def unlock(self):
410+
"""
411+
Unlock delegation
412+
"""

fabric_cf/actor/core/apis/abc_mgmt_actor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def close_reservation(self, *, rid: ID) -> bool:
126126
@return true for success; false otherwise
127127
"""
128128

129+
@abstractmethod
130+
def close_delegation(self, *, did: str) -> bool:
131+
"""
132+
Closes the specified delegation
133+
@param did delegation id
134+
@return true for success; false otherwise
135+
"""
136+
129137
@abstractmethod
130138
def close_reservations(self, *, slice_id: ID) -> bool:
131139
"""
@@ -143,6 +151,15 @@ def remove_reservation(self, *, rid: ID) -> bool:
143151
@return true for success; false otherwise
144152
"""
145153

154+
@abstractmethod
155+
def remove_delegation(self, *, did: str) -> bool:
156+
"""
157+
Removes the specified delegation.
158+
Note only closed delegation can be removed.
159+
@param did delegation id of the delegation to be removed
160+
@return true for success; false otherwise
161+
"""
162+
146163
@abstractmethod
147164
def get_reservation_state_for_reservations(self, *, reservation_list: List[str]) -> List[ReservationStateAvro]:
148165
"""

fabric_cf/actor/core/apis/abc_reservation_mixin.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,3 +508,15 @@ def handle_failed_rpc(self, *, failed: FailedRPC):
508508
Processes a failed RPC request.
509509
@param failed failed
510510
"""
511+
512+
@abstractmethod
513+
def lock(self):
514+
"""
515+
Lock the reservation
516+
"""
517+
518+
@abstractmethod
519+
def unlock(self):
520+
"""
521+
Unlock the reservation
522+
"""

fabric_cf/actor/core/apis/abc_reservation_resources.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from fabric_cf.actor.core.time.term import Term
3535
from fabric_cf.actor.core.util.resource_type import ResourceType
3636
from fabric_cf.actor.core.kernel.resource_set import ResourceSet
37-
from fabric_cf.actor.core.util.resource_count import ResourceCount
3837

3938

4039
class ABCReservationResources(ABC):
@@ -48,19 +47,6 @@ class ABCReservationResources(ABC):
4847
previousResources: the previous resource set bound to the reservation
4948
leasedResources: the concrete resources bound to the reservation.
5049
"""
51-
52-
@abstractmethod
53-
def count(self, *, rc: ResourceCount, when: datetime):
54-
"""
55-
Counts the number of resources in the reservation relative to the specified time.
56-
The ResourceCount object is updated with the count of active, pending, expired, failed, etc. units.
57-
Note: "just a hint" unless the kernel lock is held.
58-
59-
Args:
60-
rc: holder for counts
61-
time: when instance
62-
"""
63-
6450
@abstractmethod
6551
def get_approved_resources(self) -> ResourceSet:
6652
"""

fabric_cf/actor/core/apis/abc_slice.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,3 +467,15 @@ def unregister_delegation(self, *, delegation: ABCDelegation):
467467
468468
@param delegation delegation to unregister
469469
"""
470+
471+
@abstractmethod
472+
def lock_slice(self):
473+
"""
474+
Lock slice
475+
"""
476+
477+
@abstractmethod
478+
def unlock_slice(self):
479+
"""
480+
Unlock slice
481+
"""

fabric_cf/actor/core/common/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ class Constants:
174174
PROPERTY_CONF_CLASS_NAME = 'class'
175175
PROPERTY_CONF_PROPERTIES_NAME = 'properties'
176176

177+
CONFIG_SECTION_ACTOR = "actor"
178+
177179
PROTOCOL_LOCAL = "local"
178180
PROTOCOL_KAFKA = "kafka"
179181

fabric_cf/actor/core/container/globals.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ def timer_loop(self):
476476
"""
477477
Timer thread run function
478478
"""
479-
self.log.debug("Timer thread started")
479+
self.log.debug(f"Timer thread started")
480480
while True:
481481
with self.timer_condition:
482482
while self.timer_scheduler.empty() and self.started:
@@ -485,17 +485,17 @@ def timer_loop(self):
485485
self.timer_condition.wait()
486486
except InterruptedError as e:
487487
self.log.error(traceback.format_exc())
488-
self.log.error("Timer thread interrupted. Exiting {}".format(e))
488+
self.log.error(f" Timer thread interrupted. Exiting {e}")
489489
return
490490

491491
if not self.started:
492-
self.log.info("Timer thread exiting")
492+
self.log.info(f"Timer thread exiting")
493493
return
494494

495495
self.timer_condition.notify_all()
496496

497497
if not self.timer_scheduler.empty():
498-
#self.log.debug("Executing Scheduled items")
498+
#self.log.debug(f"Executing Scheduled items")
499499
self.timer_scheduler.run(blocking=False)
500500

501501

fabric_cf/actor/core/container/message_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ def handle_message(self, message: AbcMessageAvro):
103103
message.get_message_name() == AbcMessageAvro.update_reservation or \
104104
message.get_message_name() == AbcMessageAvro.remove_reservation or \
105105
message.get_message_name() == AbcMessageAvro.extend_reservation or \
106+
message.get_message_name() == AbcMessageAvro.close_delegations or \
107+
message.get_message_name() == AbcMessageAvro.remove_delegation or \
106108
message.get_message_name() == AbcMessageAvro.maintenance_request:
107109
self.kafka_mgmt_service.process(message=message)
108110
else:

0 commit comments

Comments
 (0)