-
Notifications
You must be signed in to change notification settings - Fork 7
A possible implementation for managing non-python agents #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
64fe132
3bb7842
feab97f
e1fbd11
455df18
92371ff
6bfd154
91c51d5
919b966
d6e95ec
ff41bb8
0a0404d
93cc8b2
5edbde1
4b9fd3f
e7d331f
003260a
befdda6
4222f53
c8d1fc5
cf39d9c
4a981c5
28bc23d
c805bab
e1cea68
80bb45e
87b6dce
f1a29a3
e11331b
0f71bb5
4c7549f
9d31362
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,8 +41,8 @@ def _get_local_instances(self): | |
| Returns: | ||
| agent_dict (dict): Maps instance-id to a dict describing the | ||
| agent config. The config is as contained in | ||
| HostConfig.instances but where 'instance-id', | ||
| 'agent-class', and 'manage' are all guaranteed populated | ||
| HostConfig.instances but where 'instance-id', 'agent-class' | ||
| or 'agent-exe', and 'manage' are all guaranteed populated | ||
| (and manage is one of ['yes', 'no', 'docker']). | ||
| warnings: A list of strings, each of which corresponds to | ||
| some problem found in the config. | ||
|
|
@@ -54,6 +54,10 @@ def _get_local_instances(self): | |
| self.site_config_file = site.source_file | ||
| self.host_name = hc.name | ||
| self.working_dir = hc.working_dir | ||
| self.wamp_url = site.hub.data['wamp_server'] | ||
| self.wamp_realm = site.hub.data['wamp_realm'] | ||
| self.address_root = site.hub.data['address_root'] | ||
| self.log_dir = hc.log_dir | ||
|
|
||
| # Scan for agent scripts in (deprecated) script registry | ||
| for p in hc.agent_paths: | ||
|
|
@@ -72,14 +76,25 @@ def _get_local_instances(self): | |
| continue | ||
| # Make sure 'manage' is set, and valid. | ||
| default_manage = 'no' \ | ||
| if inst['agent-class'] == 'HostManager' else 'yes' | ||
| if 'agent-class' in inst and inst['agent-class'] == 'HostManager' else 'yes' | ||
| inst['manage'] = inst.get('manage', default_manage) | ||
| if inst['manage'] not in ['yes', 'no', 'docker']: | ||
| warnings.append( | ||
| f'Configuration problem, invalid manage={inst["manage"]} ' | ||
| f'for instance_id={inst["instance-id"]}.') | ||
| continue | ||
| instances[inst['instance-id']] = inst | ||
| # Make sure either 'agent-class' or 'agent-exe' is set, but not both | ||
| if 'agent-class' not in inst and 'agent-exe' not in inst: | ||
| warnings.append( | ||
| f'Configuration problem, neither agent-class nor agent-exe is set' | ||
| f'for instance_id={inst["instance-id"]}.') | ||
| continue | ||
| if inst.get('agent-class') is not None and inst.get('agent-exe') is not None: | ||
| warnings.append( | ||
| f'Configuration problem, both agent-class and agent-exe are set' | ||
| f'for instance_id={inst["instance-id"]}.') | ||
| continue | ||
| returnValue((instances, warnings)) | ||
| yield | ||
|
|
||
|
|
@@ -169,23 +184,23 @@ def retire(db_key): | |
| docker_nonagents = list(self.docker_services.keys()) | ||
|
|
||
| for iid, hinst in agent_dict.items(): | ||
| srv = self.docker_service_prefix + iid | ||
| cls = hinst['agent-class'] | ||
| mgmt = 'host' | ||
| if srv in docker_nonagents: | ||
| docker_nonagents.remove(srv) | ||
| cls += '[d]' | ||
| mgmt = 'docker' | ||
| record = dict(hinst) | ||
| record['srv'] = self.docker_service_prefix + iid | ||
| record['mgmt'] = 'host' | ||
| if record['srv'] in docker_nonagents: | ||
| docker_nonagents.remove(record['srv']) | ||
| record['agent-class'] += '[d]' | ||
| record['mgmt'] = 'docker' | ||
| if hinst['manage'] != 'docker': | ||
| session.add_message( | ||
| f'The agent config for instance-id=' | ||
| f'{iid} was matched to docker service ' | ||
| f'{srv}, but config does not specify ' | ||
| f'{record["srv"]}, but config does not specify ' | ||
| f'manage:docker! Dropping both.') | ||
| retire(iid) | ||
| continue | ||
| else: | ||
| srv = None | ||
| record['srv'] = None | ||
| if hinst['manage'] == 'no': | ||
| continue | ||
| if hinst['manage'] == 'docker': | ||
|
|
@@ -195,36 +210,48 @@ def retire(db_key): | |
| f'in config. Dropping.') | ||
| retire(iid) | ||
| continue | ||
| new_managed.append((iid, iid, srv, cls, mgmt)) | ||
| record['db_key'] = iid | ||
| new_managed.append(record) | ||
|
|
||
| for srv in docker_nonagents: | ||
| new_managed.append((srv, srv, srv, '[docker]', 'docker')) | ||
| new_managed.append({'db_key': srv, 'instance-id': srv, 'srv': srv, | ||
| 'agent-class': '[docker]', 'mgmt': 'docker'}) | ||
|
|
||
| # Compare new managed items to stuff already in our database. | ||
| for db_key, iid, srv, cls, mgmt in new_managed: | ||
| for record in new_managed: | ||
| db_key = record['db_key'] | ||
| instance = self.database.get(db_key, None) | ||
| if instance is not None and \ | ||
| instance['management'] == 'retired': | ||
| instance = None | ||
| if instance is not None: | ||
| # So instance is some kind of actively managed container. | ||
| if (instance['agent_class'] != cls | ||
| or instance['management'] != mgmt): | ||
| if (instance['agent_class'] != record.get('agent-class') | ||
| or instance['agent_exe'] != record.get('agent-exe') | ||
| or instance['management'] != record.get('mgmt')): | ||
| session.add_message( | ||
| f'Managed agent "{db_key}" changed agent_class ' | ||
| f'({instance["agent_class"]} -> {cls}) or management ' | ||
| f'({instance["management"]} -> {mgmt}) and is being ' | ||
| f'({instance["agent_class"]} -> {record.get("agent-class")}) or agent_exe ' | ||
| f'({instance["agent_exe"]} -> {record.get("agent-exe")}) or management ' | ||
| f'({instance["management"]} -> {record.get("mgmt")}) and is being ' | ||
| f'reset!') | ||
| instance = None | ||
| if instance is None: | ||
| if record.get("agent-class") is not None: | ||
| full_name = (f'{record["agent-class"]}:{record["db_key"]}') | ||
| else: | ||
| full_name = (f'{record["agent-exe"]}:{record["db_key"]}') | ||
| instance = hm_utils.ManagedInstance.init( | ||
| management=mgmt, | ||
| instance_id=iid, | ||
| agent_class=cls, | ||
| full_name=(f'{cls}:{db_key}'), | ||
| management=record.get("mgmt"), | ||
| instance_id=record.get("instance-id"), | ||
| agent_class=record.get("agent-class"), | ||
| agent_exe=record.get("agent-exe"), | ||
| full_name=full_name, | ||
| agent_arguments=record.get("arguments"), | ||
| write_logs=record.get("write-logs", False) | ||
| ) | ||
| if mgmt == 'docker': | ||
| instance['agent_script'] = srv | ||
| if record['mgmt'] == 'docker': | ||
| instance['agent_script'] = record['srv'] | ||
| instance['prot'] = self._get_docker_helper(instance) | ||
| if instance['prot'].status[0] is None: | ||
| session.add_message( | ||
|
|
@@ -237,15 +264,17 @@ def retire(db_key): | |
| else: | ||
| # Check for the agent class in the plugin system; | ||
| # then check the (deprecated) agent script registry. | ||
| if cls in agent_plugins: | ||
| session.add_message(f'Found plugin for "{cls}"') | ||
| if record.get("agent-exe") is not None: | ||
| pass | ||
| elif record.get("agent-class") in agent_plugins: | ||
| session.add_message(f'Found plugin for "{record.get("agent-class")}"') | ||
| instance['agent_script'] = '__plugin__' | ||
| elif cls in site_config.agent_script_reg: | ||
| session.add_message(f'Found launcher script for "{cls}"') | ||
| instance['agent_script'] = site_config.agent_script_reg[cls] | ||
| elif record.get("agent-class") in site_config.agent_script_reg: | ||
| session.add_message(f'Found launcher script for "{record.get("agent-class")}"') | ||
| instance['agent_script'] = site_config.agent_script_reg[record.get("agent-class")] | ||
| else: | ||
| session.add_message(f'No plugin (nor launcher script) ' | ||
| f'found for agent_class "{cls}"!') | ||
| f'found for agent_class "{record.get("agent-class")}"!') | ||
| session.add_message(f'Tracking {instance["full_name"]}') | ||
| self.database[db_key] = instance | ||
| yield warnings | ||
|
|
@@ -303,18 +332,28 @@ def _launch_instance(self, instance): | |
| prot = self._get_docker_helper(instance) | ||
| else: | ||
| iid = instance['instance_id'] | ||
| pyth = sys.executable | ||
| script = instance['agent_script'] | ||
| if script == '__plugin__': | ||
| cmd = [pyth, '-m', 'ocs.agent_cli'] | ||
| if instance.get('agent_script') is not None: | ||
| pyth = sys.executable | ||
| script = instance['agent_script'] | ||
| if script == '__plugin__': | ||
| cmd = [pyth, '-m', 'ocs.agent_cli'] | ||
| else: | ||
| cmd = [pyth, script] | ||
| cmd.extend([ | ||
| '--instance-id', iid, | ||
| '--site-file', self.site_config_file, | ||
| '--site-host', self.host_name, | ||
| '--working-dir', self.working_dir]) | ||
| elif instance.get('agent_exe') is not None: | ||
| cmd = [instance['agent_exe'], '--instance-id', self.address_root + '.' + iid, | ||
| '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] | ||
| if "agent_arguments" in instance: | ||
| cmd.extend(instance["agent_arguments"]) | ||
| if instance['write_logs']: | ||
| log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If If this isn't set, consistent behavior with OCS would be to just not log.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for spotting that; it was definitely a bug. |
||
| else: | ||
| cmd = [pyth, script] | ||
| cmd.extend([ | ||
| '--instance-id', iid, | ||
| '--site-file', self.site_config_file, | ||
| '--site-host', self.host_name, | ||
| '--working-dir', self.working_dir]) | ||
| prot = hm_utils.AgentProcessHelper(iid, cmd) | ||
| log_file_path = None | ||
| prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) | ||
| prot.up() | ||
| instance['prot'] = prot | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ class ManagedInstance(dict): | |
| - 'agent_class' (str): The agent class. This will have special | ||
| value 'docker' if the instance corresponds to a docker-compose | ||
| service that has not been matched to a site_config entry. | ||
| - 'agent_exe' (str): The agent executable. This setting is mutually | ||
| exclusive with 'agent_class'. | ||
| - 'instance_id' (str): The agent instance-id, or the docker | ||
| service name if the instance is an unmatched docker-compose | ||
| service. | ||
|
|
@@ -214,14 +216,15 @@ def stability_factor(times, window=120): | |
|
|
||
|
|
||
| class AgentProcessHelper(protocol.ProcessProtocol): | ||
| def __init__(self, instance_id, cmd): | ||
| def __init__(self, instance_id, cmd, log_file=None): | ||
| super().__init__() | ||
| self.status = None, None | ||
| self.killed = False | ||
| self.instance_id = instance_id | ||
| self.cmd = cmd | ||
| self.lines = {'stderr': [], | ||
| 'stdout': []} | ||
| self.log_file = open(log_file, "ab") if log_file is not None else None | ||
|
|
||
| def up(self): | ||
| reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) | ||
|
|
@@ -231,6 +234,8 @@ def down(self): | |
| # race condition, but it could be worse. | ||
| if self.status[0] is None: | ||
| reactor.callFromThread(self.transport.signalProcess, 'INT') | ||
| if self.log_file is not None: | ||
| self.log_file.flush() | ||
|
||
|
|
||
| # See https://twistedmatrix.com/documents/current/core/howto/process.html | ||
| # | ||
|
|
@@ -262,11 +267,15 @@ def processExited(self, status): | |
| self.status = status, time.time() | ||
|
|
||
| def outReceived(self, data): | ||
| if self.log_file is not None: | ||
| self.log_file.write(data) | ||
| self.lines['stdout'].extend(data.decode('utf8').split('\n')) | ||
| if len(self.lines['stdout']) > 100: | ||
| self.lines['stdout'] = self.lines['stdout'][-100:] | ||
|
|
||
| def errReceived(self, data): | ||
| if self.log_file is not None: | ||
| self.log_file.write(data) | ||
| self.lines['stderr'].extend(data.decode('utf8').split('\n')) | ||
| if len(self.lines['stderr']) > 100: | ||
| self.lines['stderr'] = self.lines['stderr'][-100:] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand wanting to join address_root and iid, but can you call it
--agent-addressor--address? I don't like redefining--instance-id.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I called it
--instance-idsolely out of an attempt to better fit in with the surrounding code; my agents have always taken--addressand I later added this as an alias. So, if you prefer--address, so do I, and I'm happy to switch it. :)