Skip to content

Commit c31bac5

Browse files
authored
Merge pull request #263 from fabric-testbed/262.re_read_kafka_config
262.re read kafka config
2 parents 6f1d671 + 684f979 commit c31bac5

27 files changed

+339
-191
lines changed

fabric_cf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__VERSION__ = "1.4.3"
1+
__VERSION__ = "1.4.4"

fabric_cf/actor/boot/configuration_processor.py

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -447,50 +447,30 @@ def process_peer(self, *, peer: Peer):
447447
@param peer peer
448448
@raises ConfigurationException in case of error
449449
"""
450-
from_guid = ID(uid=peer.get_guid())
451-
from_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type())
452-
to_guid = self.actor.get_guid()
453-
to_type = self.actor.get_type()
454-
455-
# We only like peers broker->site and orchestrator->broker
456-
# Reverse the peer if it connects site->broker or broker->orchestrator
457-
458-
if from_type == ActorType.Authority and to_type == ActorType.Broker:
459-
from_guid, to_guid = to_guid, from_guid
460-
from_type, to_type = to_type, from_type
461-
462-
if from_type == ActorType.Broker and to_type == ActorType.Orchestrator:
463-
from_guid, to_guid = to_guid, from_guid
464-
from_type, to_type = to_type, from_type
465-
466-
if from_type == ActorType.Authority and to_type == ActorType.Orchestrator:
467-
from_guid, to_guid = to_guid, from_guid
468-
from_type, to_type = to_type, from_type
450+
peer_guid = ID(uid=peer.get_guid())
451+
peer_type = ActorType.get_actor_type_from_string(actor_type=peer.get_type())
452+
actor_guid = self.actor.get_guid()
453+
actor_type = self.actor.get_type()
469454

470455
# peers between actors of same type aren't allowed unless the actors are both brokers
471-
if from_type == to_type and from_type != ActorType.Broker:
456+
if peer_type == actor_type and peer_type != ActorType.Broker:
472457
raise ConfigurationException(
473458
"Invalid peer type: broker can only talk to broker, orchestrator or site authority")
474459

475460
container = ManagementUtils.connect(caller=self.actor.get_identity())
476-
to_mgmt_actor = container.get_actor(guid=to_guid)
477-
self.logger.debug(f"to_mgmt_actor={to_mgmt_actor} to_guid={to_guid}")
478-
if to_mgmt_actor is None and container.get_last_error() is not None:
461+
mgmt_actor = container.get_actor(guid=actor_guid)
462+
self.logger.info(f"Management Actor: {mgmt_actor} === {type(mgmt_actor)}")
463+
if mgmt_actor is None and container.get_last_error() is not None:
479464
self.logger.error(container.get_last_error())
480-
from_mgmt_actor = container.get_actor(guid=from_guid)
481-
self.logger.debug(f"from_mgmt_actor={from_mgmt_actor} from_guid={from_guid}")
482-
if from_mgmt_actor is None and container.get_last_error() is not None:
483-
self.logger.error(container.get_last_error())
484465

485466
self.vertex_to_registry_cache(peer=peer)
486467

487468
try:
488-
client = RemoteActorCacheSingleton.get().establish_peer(from_guid=from_guid,
489-
from_mgmt_actor=from_mgmt_actor,
490-
to_guid=to_guid, to_mgmt_actor=to_mgmt_actor)
469+
client = RemoteActorCacheSingleton.get().establish_peer(mgmt_actor=mgmt_actor, peer_guid=peer_guid,
470+
peer_type=peer_type)
491471
self.logger.debug(f"Client returned {client}")
492472
if client is not None:
493-
self.parse_exports(peer=peer, client=client, mgmt_actor=to_mgmt_actor)
473+
self.parse_exports(peer=peer, client=client, mgmt_actor=mgmt_actor)
494474
except Exception as e:
495475
raise ConfigurationException(f"Could not process exports from: {peer.get_guid()} to "
496476
f"{self.actor.get_guid()}. e= {e}")

fabric_cf/actor/core/apis/abc_client_actor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ def add_broker(self, *, broker: ABCBrokerProxy):
6363
@params broker broker to register
6464
"""
6565

66+
@abstractmethod
67+
def update_broker(self, *, broker: ABCBrokerProxy):
68+
"""
69+
Registers a broker. If this is the first broker to be registered, it is
70+
set as the default broker.
71+
72+
@params broker broker to register
73+
"""
74+
6675
@abstractmethod
6776
def demand(self, *, rid: ID):
6877
"""

fabric_cf/actor/core/apis/abc_client_actor_management_object.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ def add_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro:
116116
@return success or failure status
117117
"""
118118

119+
@abstractmethod
120+
def update_broker(self, *, broker: ProxyAvro, caller: AuthToken) -> ResultAvro:
121+
"""
122+
Update a broker
123+
@param broker: broker_proxy to be added
124+
@param caller: caller
125+
@return success or failure status
126+
"""
127+
119128
@abstractmethod
120129
def get_broker_query_model(self, *, broker: ID, caller: AuthToken, id_token: str,
121130
level: int, graph_format: GraphFormat) -> ResultBrokerQueryModelAvro:

fabric_cf/actor/core/apis/abc_mgmt_client_actor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ def add_broker(self, *, broker: ProxyAvro) -> bool:
107107
@return true for sucess; false otherwise
108108
"""
109109

110+
@abstractmethod
111+
def update_broker(self, *, broker: ProxyAvro) -> bool:
112+
"""
113+
Update an existing broker.
114+
@param broker broker
115+
@return true for sucess; false otherwise
116+
"""
117+
110118
@abstractmethod
111119
def get_broker_query_model(self, *, broker: ID, id_token: str, level: int,
112120
graph_format: GraphFormat) -> BrokerQueryModelAvro:

fabric_cf/actor/core/apis/abc_mgmt_server_actor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,15 @@ def register_client(self, *, client: ClientMng, kafka_topic: str) -> bool:
104104
@return true for success; false otherwise
105105
"""
106106

107+
@abstractmethod
108+
def update_client(self, *, client: ClientMng, kafka_topic: str) -> bool:
109+
"""
110+
Update a client
111+
@param client client
112+
@param kafka_topic Kafka topic
113+
@return true for success; false otherwise
114+
"""
115+
107116
@abstractmethod
108117
def unregister_client(self, *, guid: ID) -> bool:
109118
"""

fabric_cf/actor/core/apis/abc_server_actor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ def register_client(self, *, client: Client):
7373
@throws Exception in case of error
7474
"""
7575

76+
@abstractmethod
77+
def update_client(self, *, client: Client):
78+
"""
79+
Update the specified client.
80+
@param client client to register
81+
@throws Exception in case of error
82+
"""
83+
7684
@abstractmethod
7785
def unregister_client(self, *, guid: ID):
7886
"""

0 commit comments

Comments
 (0)