Skip to content

Commit 76f72b3

Browse files
authored
Merge pull request #361 from fabric-testbed/leaks
Network Service provisioning failures - interface already in use due to allocation algorithm deficiencies
2 parents 473f396 + 1e67698 commit 76f72b3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+402
-7796
lines changed

Dockerfile-auth

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM python:3.11.0
22
MAINTAINER Komal Thareja<[email protected]>
33

4-
ARG HANDLERS_VER=1.6.1
4+
ARG HANDLERS_VER=1.6.2
55

66
RUN mkdir -p /usr/src/app
77
WORKDIR /usr/src/app
@@ -25,7 +25,6 @@ RUN mkdir -p "/etc/fabric/actor/config"
2525
RUN mkdir -p "/var/log/actor"
2626
RUN cp /usr/local/lib/python3.11/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
2727
RUN pip3 install fabric-am-handlers==${HANDLERS_VER}
28-
RUN pip3 install fabric-message-bus==1.6.1
2928
RUN sh /usr/src/app/install.sh
3029

3130
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]

fabric_cf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
__version__ = "1.6.2"
1+
__version__ = "1.6.1"
22
__VERSION__ = __version__

fabric_cf/actor/boot/configuration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ def get_kafka_key_schema_location(self) -> str or None:
460460
return self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_KEY_SCHEMA, None)
461461

462462
def get_kafka_consumer_auto_commit_interval(self) -> int:
463-
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_AUTO_COMMIT_INTERVAL), 5)
463+
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_AUTO_COMMIT_INTERVAL, 5))
464464

465465
def get_kafka_consumer_commit_batch_size(self) -> int:
466466
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_BATCH_SIZE, 1))

fabric_cf/actor/core/apis/abc_actor_mixin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,15 @@ def fail_delegation(self, *, did: str, message: str):
554554
"""
555555

556556
@abstractmethod
557-
def close_by_rid(self, *, rid: ID):
557+
def close_by_rid(self, *, rid: ID, force: bool = False):
558558
"""
559559
Closes the reservation. Note: the reservation must have already been registered with the actor.
560560
This method may involve either a client or a server side action or both. When called on a broker,
561561
this method will only close the broker reservation.
562562
563563
Args:
564564
rid: reservation id
565+
force: force close
565566
Raises:
566567
Exception in case of error
567568
"""

fabric_cf/actor/core/apis/abc_database.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from abc import abstractmethod, ABC
2929
from datetime import datetime
30-
from typing import TYPE_CHECKING, List, Union
30+
from typing import TYPE_CHECKING, List, Union, Tuple, Dict
3131

3232
from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation
3333
from fabric_cf.actor.core.kernel.slice import SliceTypes
@@ -170,6 +170,17 @@ def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, pr
170170
@throws Exception in case of error
171171
"""
172172

173+
@abstractmethod
174+
def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
175+
bdf: str = None) -> Dict[str, List[str]]:
176+
"""
177+
Retrieves the components.
178+
179+
@return list of components
180+
181+
@throws Exception in case of error
182+
"""
183+
173184
@abstractmethod
174185
def get_client_reservations(self, *, slice_id: ID = None) -> List[ABCReservationMixin]:
175186
"""

fabric_cf/actor/core/apis/abc_reservation_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def can_renew(self) -> bool:
282282
"""
283283

284284
@abstractmethod
285-
def close(self):
285+
def close(self, force: bool = False):
286286
"""
287287
Closes the reservation. Locked with the kernel lock.
288288
"""
@@ -359,7 +359,7 @@ def probe_pending(self):
359359
"""
360360

361361
@abstractmethod
362-
def reserve(self, *, policy: ABCPolicy):
362+
def reserve(self, *, policy: ABCPolicy) -> bool:
363363
"""
364364
Reserve resources: ticket() initiate or request, or redeem()
365365
request. New reservation.

fabric_cf/actor/core/core/actor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ def fail_delegation(self, *, did: str, message: str):
154154
def close_delegation(self, *, did: str):
155155
self.wrapper.close_delegation(did=did)
156156

157-
def close_by_rid(self, *, rid: ID):
158-
self.wrapper.close(rid=rid)
157+
def close_by_rid(self, *, rid: ID, force: bool = False):
158+
self.wrapper.close(rid=rid, force=force)
159159

160160
def close(self, *, reservation: ABCReservationMixin):
161161
if reservation is not None:

fabric_cf/actor/core/delegation/broker_delegation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ def accept_delegation_update(self, *, incoming: ABCDelegation, update_data: Upda
218218
self.delegation_update_satisfies(incoming=incoming, update_data=update_data)
219219
self.absorb_delegation_update(incoming=incoming, update_data=update_data)
220220
except Exception as e:
221+
if incoming.get_graph() is not None:
222+
incoming.get_graph().delete_graph()
221223
success = False
222224
update_data.error(message=str(e))
223225
self.logger.error(traceback.format_exc())

fabric_cf/actor/core/kernel/authority_reservation.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,20 @@ def prepare(self, *, callback: ABCCallbackProxy, logger):
139139

140140
self.state = ReservationStates.Ticketed
141141

142-
def reserve(self, *, policy: ABCPolicy):
142+
def reserve(self, *, policy: ABCPolicy) -> bool:
143143
self.nothing_pending()
144144
self.incoming_request()
145145
if self.is_active():
146-
self.error(err="reservation already holds a lease")
146+
#self.error(err="reservation already holds a lease")
147+
self.logger.warning(f"Reservation: {self.get_reservation_id()} already holds a lease")
148+
return False
147149

148150
self.policy = policy
149151
self.approved = False
150152
self.bid_pending = True
151153
self.pending_recover = False
152154
self.map_and_update(extend=False)
155+
return True
153156

154157
def service_reserve(self):
155158
try:
@@ -210,7 +213,7 @@ def service_modify_lease(self):
210213
self.logger.error("authority failed servicing modifylease e: {}".format(e))
211214
self.fail_notify(message=str(e))
212215

213-
def close(self):
216+
def close(self, force: bool = False):
214217
self.logger.debug("Processing close for #{}".format(self.rid))
215218
self.transition(prefix="external close", state=self.state, pending=ReservationPendingStates.Closing)
216219

@@ -418,8 +421,12 @@ def probe_pending(self):
418421

419422
elif self.pending_state == ReservationPendingStates.Closing:
420423
if self.resources is None or self.resources.is_closed():
421-
self.transition(prefix="close complete", state=ReservationStates.Closed,
422-
pending=ReservationPendingStates.None_)
424+
if self.update_data.failed:
425+
self.transition(prefix="close complete failed", state=ReservationStates.CloseFail,
426+
pending=ReservationPendingStates.None_)
427+
else:
428+
self.transition(prefix="close complete", state=ReservationStates.Closed,
429+
pending=ReservationPendingStates.None_)
423430
self.pending_recover = False
424431
self.generate_update()
425432

@@ -515,7 +522,10 @@ def reap(self):
515522
released = self.resources.collect_released()
516523
if released is not None:
517524
if not released.get_notices().is_empty():
518-
self.update_data.post(event=released.get_notices().get_notice())
525+
notice = released.get_notices().get_notice()
526+
self.update_data.post(event=notice)
527+
if "Exception" in notice:
528+
self.update_data.error(message=notice)
519529
self.policy.release(resources=released)
520530
except Exception as e:
521531
self.logger.error("exception in authority reap e: {}".format(e))

fabric_cf/actor/core/kernel/broker_reservation.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def prepare(self, *, callback: ABCCallbackProxy, logger):
221221

222222
self.set_dirty()
223223

224-
def reserve(self, *, policy: ABCPolicy):
224+
def reserve(self, *, policy: ABCPolicy) -> bool:
225225
# These handlers may need to be slightly more sophisticated, since a
226226
# client may bid multiple times on a ticket as part of an auction
227227
# protocol: so we may receive a reserve or extend when there is already
@@ -232,12 +232,13 @@ def reserve(self, *, policy: ABCPolicy):
232232
self.pending_state != ReservationPendingStates.Ticketing:
233233
# We do not want to fail the reservation simply log a warning and exit from reserve
234234
self.logger.warning("Duplicate ticket request")
235-
return
235+
return False
236236

237237
self.policy = policy
238238
self.approved = False
239239
self.bid_pending = True
240240
self.map_and_update(ticketed=False)
241+
return True
241242

242243
def service_reserve(self):
243244
# resources is null initially. It becomes non-null once the
@@ -276,13 +277,13 @@ def service_extend_ticket(self):
276277
pending=ReservationPendingStates.None_)
277278
self.generate_update()
278279

279-
def close(self):
280+
def close(self, force: bool = False):
280281
send_notification = False
281282
if self.state == ReservationStates.Nascent or self.pending_state != ReservationPendingStates.None_:
282283
self.logger.warning("Closing a reservation in progress")
283284
send_notification = True
284285

285-
if self.state != ReservationStates.Closed:
286+
if self.state not in [ReservationStates.Closed, ReservationStates.CloseFail] or force:
286287
if self.pending_state == ReservationPendingStates.Priming or \
287288
(self.pending_state == ReservationPendingStates.Ticketing and not self.bid_pending):
288289
# Close in Priming is a special case: when processing the close
@@ -296,7 +297,11 @@ def close(self):
296297
self.logger.debug("closing reservation #{} while in Priming".format(self.rid))
297298
self.closed_in_priming = True
298299

299-
self.transition(prefix="closed", state=ReservationStates.Closed, pending=ReservationPendingStates.None_)
300+
if not self.update_data.is_failed():
301+
self.transition(prefix="closed", state=ReservationStates.Closed, pending=ReservationPendingStates.None_)
302+
else:
303+
self.transition(prefix="closed-failed", state=ReservationStates.CloseFail,
304+
pending=ReservationPendingStates.None_)
300305
self.policy.closed(reservation=self)
301306

302307
if send_notification:
@@ -536,7 +541,16 @@ def set_authority(self, *, authority: ABCAuthorityProxy):
536541
def update_lease(self, *, incoming: ABCReservationMixin, update_data):
537542
self.logger.info(f"Received Update Lease: {incoming} at Broker")
538543
# TODO add any processing if needed
539-
self.logger.info(f"Do Nothing!")
544+
if incoming.get_resources() and incoming.get_resources().get_sliver() and incoming.get_resources().get_sliver().get_reservation_info():
545+
incoming_state = incoming.get_resources().get_sliver().get_reservation_info().reservation_state
546+
else:
547+
incoming_state = None
548+
self.logger.info(f"Update Lease from authority in state: {self.get_state()} "
549+
f"Incoming: {incoming_state}|{incoming.get_notices()} update_data: {update_data}!")
550+
if incoming_state and incoming_state == str(ReservationStates.CloseFail):
551+
self.update_data.absorb(other=update_data)
552+
self.logger.info("Closing a reservation which failed to delete at the authority")
553+
self.close()
540554

541555
def handle_failed_rpc(self, *, failed: FailedRPC):
542556
if failed.get_request_type() == RPCRequestType.UpdateTicket and \

fabric_cf/actor/core/kernel/kernel.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ def amend_reserve(self, *, reservation: ABCReservationMixin):
8585
"""
8686
try:
8787
reservation.lock()
88-
reservation.reserve(policy=self.policy)
88+
ignore = reservation.reserve(policy=self.policy)
8989
self.plugin.get_database().update_reservation(reservation=reservation)
90-
if not reservation.is_failed():
90+
if not reservation.is_failed() and not ignore:
9191
reservation.service_reserve()
9292
except Exception as e:
9393
self.logger.error(traceback.format_exc())
@@ -201,21 +201,22 @@ def close_delegation(self, *, delegation: ABCDelegation):
201201
finally:
202202
delegation.unlock()
203203

204-
def close(self, *, reservation: ABCReservationMixin):
204+
def close(self, *, reservation: ABCReservationMixin, force: bool = False):
205205
"""
206206
Handles a close operation for the reservation.
207207
Client: perform local close operations and issue close request to
208208
authority.
209209
Broker: perform local close operations
210210
Authority: process a close request
211211
@param reservation reservation for which to perform close
212+
@param force force close
212213
@throws Exception
213214
"""
214215
try:
215216
reservation.lock()
216217
if not reservation.is_closed() and not reservation.is_closing():
217218
self.policy.close(reservation=reservation)
218-
reservation.close()
219+
reservation.close(force=force)
219220
self.plugin.get_database().update_reservation(reservation=reservation)
220221
reservation.service_close()
221222
except Exception as e:
@@ -932,7 +933,13 @@ def register_slice(self, *, slice_object: ABCSlice):
932933
self.slices.add(slice_object=slice_object)
933934

934935
try:
935-
self.plugin.get_database().add_slice(slice_object=slice_object)
936+
# Only add slice if it doesn't exist
937+
# Slice may exist in cases where it only one sliver which was removed by a modify
938+
exists = self.plugin.get_database().get_slices(slice_id=slice_object.get_slice_id())
939+
if not len(exists):
940+
self.plugin.get_database().add_slice(slice_object=slice_object)
941+
else:
942+
self.plugin.get_database().update_slice(slice_object=slice_object)
936943
except Exception as e:
937944
self.slices.remove(slice_id=slice_object.get_slice_id())
938945
self.logger.error(traceback.format_exc())
@@ -989,7 +996,8 @@ def remove_reservation(self, *, rid: ID):
989996
if real is not None:
990997
try:
991998
real.lock()
992-
if real.is_closed() or real.is_failed() or real.get_state() == ReservationStates.CloseWait:
999+
if real.is_closed() or real.is_failed() or \
1000+
real.get_state() in [ReservationStates.CloseWait, ReservationStates.CloseFail]:
9931001
self.unregister_reservation(rid=rid)
9941002
else:
9951003
raise KernelException("Only reservations in failed, closed, or closewait state can be removed.")
@@ -1312,7 +1320,8 @@ def __unregister(self, *, reservation: ABCReservationMixin, slice_object: ABCSli
13121320
@param slice_object local slice object
13131321
@throws Exception
13141322
"""
1315-
if reservation.is_closed() or reservation.is_failed() or reservation.get_state() == ReservationStates.CloseWait:
1323+
if reservation.is_closed() or reservation.is_failed() or \
1324+
reservation.get_state() in [ReservationStates.CloseWait, ReservationStates.CloseFail]:
13161325
try:
13171326
slice_object.lock_slice()
13181327
slice_object.unregister(reservation=reservation)
@@ -1521,6 +1530,8 @@ def validate_delegation(self, *, delegation: ABCDelegation = None, did: str = No
15211530
if delegation is not None:
15221531
local = self.soft_validate_delegation(delegation=delegation)
15231532
if local is None:
1533+
if delegation.get_graph() is not None:
1534+
delegation.get_graph().delete_graph()
15241535
self.error(err="delegation not found", e=DelegationNotFoundException(did=did))
15251536
return local
15261537

fabric_cf/actor/core/kernel/kernel_wrapper.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,21 +152,22 @@ def fail_delegation(self, *, did: str, message: str):
152152
target = self.kernel.validate_delegation(did=did)
153153
self.kernel.fail_delegation(delegation=target, message=message)
154154

155-
def close(self, *, rid: ID):
155+
def close(self, *, rid: ID, force: bool = False):
156156
"""
157157
Closes the reservation, potentially initiating a close request to another
158158
actor. If the reservation has concrete resources bound to it, this method
159159
may return before all close operations have completed. Check the
160160
reservation state to determine when close completes.
161161
@param rid identifier of reservation to close
162+
@param force forced close
162163
@throws Exception in case of error
163164
"""
164165
if rid is None:
165166
raise KernelException(Constants.INVALID_ARGUMENT)
166167
target = self.kernel.validate(rid=rid)
167168
# NOTE: this call does not require access control check, since
168169
# it is executed in the context of the actor represented by KernelWrapper.
169-
self.kernel.close(reservation=target)
170+
self.kernel.close(reservation=target, force=force)
170171

171172
def close_delegation(self, *, did: str):
172173
"""

fabric_cf/actor/core/kernel/reservation.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def clear_notice(self, clear_fail: bool=False):
237237
Clears all event notices associated with the reservation.
238238
"""
239239

240-
def close(self):
240+
def close(self, force: bool = False):
241241
"""
242242
Close a reservation
243243
"""
@@ -416,7 +416,7 @@ def is_bid_pending(self) -> bool:
416416
return self.bid_pending
417417

418418
def is_closed(self) -> bool:
419-
return self.state == ReservationStates.Closed
419+
return self.state == ReservationStates.Closed or self.state == ReservationStates.CloseFail
420420

421421
def is_closing(self) -> bool:
422422
return self.state == ReservationStates.CloseWait or self.pending_state == ReservationPendingStates.Closing
@@ -495,11 +495,11 @@ def ready(self):
495495
496496
@throws Exception thrown if the state is closed or failed
497497
"""
498-
if self.state == ReservationStates.Closed or self.state == ReservationStates.Failed:
498+
if self.state in [ReservationStates.Closed, ReservationStates.Failed, ReservationStates.CloseFail]:
499499
self.error(err="invalid Reservation")
500500

501-
def reserve(self, *, policy: ABCPolicy):
502-
return
501+
def reserve(self, *, policy: ABCPolicy) -> bool:
502+
return True
503503

504504
def setup(self):
505505
"""

0 commit comments

Comments
 (0)