3636from fabric_cf .actor .core .manage .messages .client_mng import ClientMng
3737from fabric_mb .message_bus .messages .proxy_avro import ProxyAvro
3838from fabric_cf .actor .core .util .id import ID
39+ from fabric_cf .actor .core .apis .abc_actor_mixin import ActorType
3940
4041if TYPE_CHECKING :
4142 from fabric_cf .actor .core .apis .abc_mgmt_actor import ABCMgmtActor
42- from fabric_cf .actor .core .apis .abc_actor_mixin import ABCActorMixin , ActorType
43+ from fabric_cf .actor .core .apis .abc_actor_mixin import ABCActorMixin
4344
4445
4546class RemoteActorCacheException (Exception ):
@@ -163,34 +164,38 @@ def check_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type: Acto
163164 """
164165 self .logger .debug (f"Check if Peer { peer_guid } /{ peer_type } already exists!" )
165166 try :
166- # For Broker/AM
167- if isinstance (mgmt_actor , ABCMgmtServerActor ):
168- self .logger .debug (f"Checking clients" )
169- clients = mgmt_actor .get_clients (guid = peer_guid )
170- self .logger .debug (f"clients -- { clients } { mgmt_actor .get_last_error ()} " )
171- if clients is not None :
172- self .logger .debug (f"Edge between { mgmt_actor .get_guid ()} and { peer_guid } exists (client)" )
173- return True
174-
175- # For Orchestrator/Broker
176- elif isinstance (mgmt_actor , ABCMgmtClientActor ):
167+ # For Broker - all AMs are added as proxies
168+ # For Orchestrator - all peers will be added as Proxies
169+ if isinstance (mgmt_actor , ABCMgmtClientActor ) and peer_type in [ActorType .Authority , ActorType .Broker ]:
177170 self .logger .debug (f"Checking brokers" )
178171 brokers = mgmt_actor .get_brokers (broker = peer_guid )
179172 self .logger .debug (f"brokers -- { brokers } " )
180173 if brokers is not None :
181174 self .logger .debug (f"Edge between { mgmt_actor .get_guid ()} and { peer_guid } exists (broker)" )
182175 return True
176+
177+ # For AM - all peers will be added as clients
178+ # For Broker - orchestrator as client
179+ elif isinstance (mgmt_actor , ABCMgmtServerActor ) and peer_type in [ActorType .Orchestrator , ActorType .Broker ]:
180+ self .logger .debug (f"Checking clients" )
181+ clients = mgmt_actor .get_clients (guid = peer_guid )
182+ self .logger .debug (f"clients -- { clients } { mgmt_actor .get_last_error ()} " )
183+ if clients is not None :
184+ self .logger .debug (f"Edge between { mgmt_actor .get_guid ()} and { peer_guid } exists (client)" )
185+ return True
183186 except Exception as e :
184187 raise RemoteActorCacheException (f"Unable to cast actor { mgmt_actor .get_guid ()} or { peer_guid } e={ e } " )
185188
186189 self .logger .debug (f"Edge between { mgmt_actor .get_guid ()} and { peer_guid } does not exist" )
187190 return False
188191
189- def establish_peer_private (self , * , mgmt_actor : ABCMgmtActor , peer_guid : ID , update : bool = False ) -> ClientMng :
192+ def establish_peer_private (self , * , mgmt_actor : ABCMgmtActor , peer_guid : ID , peer_type : ActorType ,
193+ update : bool = False ) -> ClientMng :
190194 """
191195 Establish connection i.e. create either proxies or clients between peer
192196 @param mgmt_actor mgmt_actor
193197 @param peer_guid peer_guid
198+ @param peer_type peer_type
194199 @param update update
195200 """
196201 self .logger .debug ("establish_peer_private IN" )
@@ -206,7 +211,7 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd
206211 if kafka_topic is None :
207212 raise RemoteActorCacheException (f"Actor { peer_guid } does not have a kafka topic" )
208213
209- if isinstance (mgmt_actor , ABCMgmtClientActor ):
214+ if isinstance (mgmt_actor , ABCMgmtClientActor ) and peer_type in [ ActorType . Authority , ActorType . Broker ] :
210215 proxy = ProxyAvro ()
211216 proxy .set_protocol (protocol )
212217 proxy .set_guid (str (identity .get_guid ()))
@@ -228,7 +233,7 @@ def establish_peer_private(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, upd
228233 except Exception as e :
229234 self .logger .error (e )
230235 self .logger .error (traceback .format_exc ())
231- elif isinstance (mgmt_actor , ABCMgmtServerActor ):
236+ elif isinstance (mgmt_actor , ABCMgmtServerActor ) and peer_type in [ ActorType . Orchestrator , ActorType . Broker ] :
232237 client = ClientMng ()
233238 client .set_name (name = cache_entry .get (self .actor_name ))
234239 client .set_guid (guid = str (peer_guid ))
@@ -261,7 +266,8 @@ def establish_peer(self, *, mgmt_actor: ABCMgmtActor, peer_guid: ID, peer_type:
261266 try :
262267 update = self .check_peer (mgmt_actor = mgmt_actor , peer_guid = peer_guid , peer_type = peer_type )
263268
264- client = self .establish_peer_private (mgmt_actor = mgmt_actor , peer_guid = peer_guid , update = update )
269+ client = self .establish_peer_private (mgmt_actor = mgmt_actor , peer_guid = peer_guid , peer_type = peer_type ,
270+ update = update )
265271
266272 self .check_to_remove_entry (guid = peer_guid )
267273
0 commit comments