Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
64fe132
A possible implementation for managing non-python agents
Feb 24, 2023
3bb7842
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 24, 2023
feab97f
Fix reference to removed variable
Feb 24, 2023
e1fbd11
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Mar 6, 2023
455df18
Merge pull request #321 from simonsobs/pre-commit-ci-update-config
BrianJKoopman Mar 7, 2023
92371ff
Separate blocks for each registered operation (#312)
jlashner Mar 8, 2023
6bfd154
Add importlib_metadata dependency for ocs-agent-cli
BrianJKoopman Mar 15, 2023
91c51d5
Merge pull request #323 from simonsobs/koopman/importlib-metadata-dep
BrianJKoopman Mar 15, 2023
919b966
Use procsettitle to have ocs-agent-cli rename itself
mhasself Apr 12, 2023
d6e95ec
Merge pull request #325 from simonsobs/set-title
BrianJKoopman Apr 12, 2023
ff41bb8
Fix readthedocs builds (#328)
BrianJKoopman May 10, 2023
0a0404d
Improve test/development portability (#319)
cnweaver May 11, 2023
93cc8b2
Revist address_root != "observatory" (#327)
mhasself May 16, 2023
5edbde1
Cast tuple param before type check
BrianJKoopman May 22, 2023
4b9fd3f
Merge pull request #332 from simonsobs/koopman/issue-274
BrianJKoopman May 23, 2023
e7d331f
Support direct start calls with params to param decorated Tasks/Proce…
BrianJKoopman May 24, 2023
003260a
Replace pytest-docker-compose with pytest-docker plugin
BrianJKoopman Aug 9, 2023
befdda6
Merge pull request #342 from simonsobs/koopman/replace-pytest-docker-…
BrianJKoopman Aug 9, 2023
4222f53
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Jul 31, 2023
c8d1fc5
Merge pull request #340 from simonsobs/pre-commit-ci-update-config
BrianJKoopman Aug 9, 2023
cf39d9c
systemd service file: add RestartSec=10s
mhasself Aug 18, 2023
4a981c5
Merge pull request #344 from simonsobs/hm-service-robustness
BrianJKoopman Aug 23, 2023
28bc23d
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Aug 28, 2023
c805bab
Merge pull request #346 from simonsobs/pre-commit-ci-update-config
BrianJKoopman Aug 30, 2023
e1cea68
A possible implementation for managing non-python agents
Feb 24, 2023
80bb45e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 24, 2023
87b6dce
Fix reference to removed variable
Feb 24, 2023
f1a29a3
Merge branch 'non-python-agents' of https://github.com/cnweaver/ocs i…
Sep 14, 2023
e11331b
Call the combined address root and instance ID the address
Sep 14, 2023
0f71bb5
Turn off logging instead of crashing if log_dir is not set
Sep 14, 2023
4c7549f
Flush log file later to avoid confusing log delays
Sep 15, 2023
9d31362
Log executable agent output by default
Sep 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 81 additions & 42 deletions ocs/agents/host_manager/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def _get_local_instances(self):
Returns:
agent_dict (dict): Maps instance-id to a dict describing the
agent config. The config is as contained in
HostConfig.instances but where 'instance-id',
'agent-class', and 'manage' are all guaranteed populated
HostConfig.instances but where 'instance-id', 'agent-class'
or 'agent-exe', and 'manage' are all guaranteed populated
(and manage is one of ['yes', 'no', 'docker']).
warnings: A list of strings, each of which corresponds to
some problem found in the config.
Expand All @@ -54,6 +54,10 @@ def _get_local_instances(self):
self.site_config_file = site.source_file
self.host_name = hc.name
self.working_dir = hc.working_dir
self.wamp_url = site.hub.data['wamp_server']
self.wamp_realm = site.hub.data['wamp_realm']
self.address_root = site.hub.data['address_root']
self.log_dir = hc.log_dir

# Scan for agent scripts in (deprecated) script registry
for p in hc.agent_paths:
Expand All @@ -72,14 +76,25 @@ def _get_local_instances(self):
continue
# Make sure 'manage' is set, and valid.
default_manage = 'no' \
if inst['agent-class'] == 'HostManager' else 'yes'
if 'agent-class' in inst and inst['agent-class'] == 'HostManager' else 'yes'
inst['manage'] = inst.get('manage', default_manage)
if inst['manage'] not in ['yes', 'no', 'docker']:
warnings.append(
f'Configuration problem, invalid manage={inst["manage"]} '
f'for instance_id={inst["instance-id"]}.')
continue
instances[inst['instance-id']] = inst
# Make sure either 'agent-class' or 'agent-exe' is set, but not both
if 'agent-class' not in inst and 'agent-exe' not in inst:
warnings.append(
f'Configuration problem, neither agent-class nor agent-exe is set'
f'for instance_id={inst["instance-id"]}.')
continue
if inst.get('agent-class') is not None and inst.get('agent-exe') is not None:
warnings.append(
f'Configuration problem, both agent-class and agent-exe are set'
f'for instance_id={inst["instance-id"]}.')
continue
returnValue((instances, warnings))
yield

Expand Down Expand Up @@ -169,23 +184,23 @@ def retire(db_key):
docker_nonagents = list(self.docker_services.keys())

for iid, hinst in agent_dict.items():
srv = self.docker_service_prefix + iid
cls = hinst['agent-class']
mgmt = 'host'
if srv in docker_nonagents:
docker_nonagents.remove(srv)
cls += '[d]'
mgmt = 'docker'
record = dict(hinst)
record['srv'] = self.docker_service_prefix + iid
record['mgmt'] = 'host'
if record['srv'] in docker_nonagents:
docker_nonagents.remove(record['srv'])
record['agent-class'] += '[d]'
record['mgmt'] = 'docker'
if hinst['manage'] != 'docker':
session.add_message(
f'The agent config for instance-id='
f'{iid} was matched to docker service '
f'{srv}, but config does not specify '
f'{record["srv"]}, but config does not specify '
f'manage:docker! Dropping both.')
retire(iid)
continue
else:
srv = None
record['srv'] = None
if hinst['manage'] == 'no':
continue
if hinst['manage'] == 'docker':
Expand All @@ -195,36 +210,48 @@ def retire(db_key):
f'in config. Dropping.')
retire(iid)
continue
new_managed.append((iid, iid, srv, cls, mgmt))
record['db_key'] = iid
new_managed.append(record)

for srv in docker_nonagents:
new_managed.append((srv, srv, srv, '[docker]', 'docker'))
new_managed.append({'db_key': srv, 'instance-id': srv, 'srv': srv,
'agent-class': '[docker]', 'mgmt': 'docker'})

# Compare new managed items to stuff already in our database.
for db_key, iid, srv, cls, mgmt in new_managed:
for record in new_managed:
db_key = record['db_key']
instance = self.database.get(db_key, None)
if instance is not None and \
instance['management'] == 'retired':
instance = None
if instance is not None:
# So instance is some kind of actively managed container.
if (instance['agent_class'] != cls
or instance['management'] != mgmt):
if (instance['agent_class'] != record.get('agent-class')
or instance['agent_exe'] != record.get('agent-exe')
or instance['management'] != record.get('mgmt')):
session.add_message(
f'Managed agent "{db_key}" changed agent_class '
f'({instance["agent_class"]} -> {cls}) or management '
f'({instance["management"]} -> {mgmt}) and is being '
f'({instance["agent_class"]} -> {record.get("agent-class")}) or agent_exe '
f'({instance["agent_exe"]} -> {record.get("agent-exe")}) or management '
f'({instance["management"]} -> {record.get("mgmt")}) and is being '
f'reset!')
instance = None
if instance is None:
if record.get("agent-class") is not None:
full_name = (f'{record["agent-class"]}:{record["db_key"]}')
else:
full_name = (f'{record["agent-exe"]}:{record["db_key"]}')
instance = hm_utils.ManagedInstance.init(
management=mgmt,
instance_id=iid,
agent_class=cls,
full_name=(f'{cls}:{db_key}'),
management=record.get("mgmt"),
instance_id=record.get("instance-id"),
agent_class=record.get("agent-class"),
agent_exe=record.get("agent-exe"),
full_name=full_name,
agent_arguments=record.get("arguments"),
write_logs=record.get("write-logs", False)
)
if mgmt == 'docker':
instance['agent_script'] = srv
if record['mgmt'] == 'docker':
instance['agent_script'] = record['srv']
instance['prot'] = self._get_docker_helper(instance)
if instance['prot'].status[0] is None:
session.add_message(
Expand All @@ -237,15 +264,17 @@ def retire(db_key):
else:
# Check for the agent class in the plugin system;
# then check the (deprecated) agent script registry.
if cls in agent_plugins:
session.add_message(f'Found plugin for "{cls}"')
if record.get("agent-exe") is not None:
pass
elif record.get("agent-class") in agent_plugins:
session.add_message(f'Found plugin for "{record.get("agent-class")}"')
instance['agent_script'] = '__plugin__'
elif cls in site_config.agent_script_reg:
session.add_message(f'Found launcher script for "{cls}"')
instance['agent_script'] = site_config.agent_script_reg[cls]
elif record.get("agent-class") in site_config.agent_script_reg:
session.add_message(f'Found launcher script for "{record.get("agent-class")}"')
instance['agent_script'] = site_config.agent_script_reg[record.get("agent-class")]
else:
session.add_message(f'No plugin (nor launcher script) '
f'found for agent_class "{cls}"!')
f'found for agent_class "{record.get("agent-class")}"!')
session.add_message(f'Tracking {instance["full_name"]}')
self.database[db_key] = instance
yield warnings
Expand Down Expand Up @@ -303,18 +332,28 @@ def _launch_instance(self, instance):
prot = self._get_docker_helper(instance)
else:
iid = instance['instance_id']
pyth = sys.executable
script = instance['agent_script']
if script == '__plugin__':
cmd = [pyth, '-m', 'ocs.agent_cli']
if instance.get('agent_script') is not None:
pyth = sys.executable
script = instance['agent_script']
if script == '__plugin__':
cmd = [pyth, '-m', 'ocs.agent_cli']
else:
cmd = [pyth, script]
cmd.extend([
'--instance-id', iid,
'--site-file', self.site_config_file,
'--site-host', self.host_name,
'--working-dir', self.working_dir])
elif instance.get('agent_exe') is not None:
cmd = [instance['agent_exe'], '--instance-id', self.address_root + '.' + iid,
'--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand wanting to join address_root and iid, but can you call it --agent-address or --address? I don't like redefining --instance-id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I called it --instance-id solely out of an attempt to better fit in with the surrounding code; my agents have always taken --address and I later added this as an alias. So, if you prefer --address, so do I, and I'm happy to switch it. :)

if "agent_arguments" in instance:
cmd.extend(instance["agent_arguments"])
if instance['write_logs']:
log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If log_dir isn't defined in the SCF this'll throw:

2023-09-13T16:50:49-0400 Unhandled Error
Traceback (most recent call last):
  File "/home/koopman/git/ocs/ocs/agents/host_manager/agent.py", line 672, in main
    runner.run(agent, auto_reconnect=True)
  File "/home/koopman/.venv/user/lib/python3.11/site-packages/autobahn/twisted/wamp.py", line 439, in run
    reactor.run()
  File "/home/koopman/.venv/user/lib/python3.11/site-packages/twisted/internet/base.py", line 1318, in run
    self.mainLoop()
  File "/home/koopman/.venv/user/lib/python3.11/site-packages/twisted/internet/base.py", line 1328, in mainLoop
    reactorBaseSelf.runUntilCurrent()
--- <exception caught here> ---
  File "/home/koopman/.venv/user/lib/python3.11/site-packages/twisted/internet/base.py", line 967, in runUntilCurrent
    f(*a, **kw)
  File "/home/koopman/git/ocs/ocs/agents/host_manager/agent.py", line 353, in _launch_instance
    log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log"
builtins.TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

If this isn't set, consistent behavior with OCS would be to just not log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting that; it was definitely a bug.

else:
cmd = [pyth, script]
cmd.extend([
'--instance-id', iid,
'--site-file', self.site_config_file,
'--site-host', self.host_name,
'--working-dir', self.working_dir])
prot = hm_utils.AgentProcessHelper(iid, cmd)
log_file_path = None
prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path)
prot.up()
instance['prot'] = prot

Expand Down
11 changes: 10 additions & 1 deletion ocs/agents/host_manager/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ManagedInstance(dict):
- 'agent_class' (str): The agent class. This will have special
value 'docker' if the instance corresponds to a docker-compose
service that has not been matched to a site_config entry.
- 'agent_exe' (str): The agent executable. This setting is mutually
exclusive with 'agent_class'.
- 'instance_id' (str): The agent instance-id, or the docker
service name if the instance is an unmatched docker-compose
service.
Expand Down Expand Up @@ -214,14 +216,15 @@ def stability_factor(times, window=120):


class AgentProcessHelper(protocol.ProcessProtocol):
def __init__(self, instance_id, cmd):
def __init__(self, instance_id, cmd, log_file=None):
super().__init__()
self.status = None, None
self.killed = False
self.instance_id = instance_id
self.cmd = cmd
self.lines = {'stderr': [],
'stdout': []}
self.log_file = open(log_file, "ab") if log_file is not None else None

def up(self):
reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ)
Expand All @@ -231,6 +234,8 @@ def down(self):
# race condition, but it could be worse.
if self.status[0] is None:
reactor.callFromThread(self.transport.signalProcess, 'INT')
if self.log_file is not None:
self.log_file.flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my brief tests, this flush happens too quickly -- before the INT is sent and processed. You should probably do a flush in processExited in addition to / instead of here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed; moving the flush to processExited seems to work much better.


# See https://twistedmatrix.com/documents/current/core/howto/process.html
#
Expand Down Expand Up @@ -262,11 +267,15 @@ def processExited(self, status):
self.status = status, time.time()

def outReceived(self, data):
if self.log_file is not None:
self.log_file.write(data)
self.lines['stdout'].extend(data.decode('utf8').split('\n'))
if len(self.lines['stdout']) > 100:
self.lines['stdout'] = self.lines['stdout'][-100:]

def errReceived(self, data):
if self.log_file is not None:
self.log_file.write(data)
self.lines['stderr'].extend(data.decode('utf8').split('\n'))
if len(self.lines['stderr']) > 100:
self.lines['stderr'] = self.lines['stderr'][-100:]
Expand Down
39 changes: 27 additions & 12 deletions ocs/ocsbow.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def get_status(args, site_config, restrict_hosts=None):
if inst.get('manage') is None:
inst['manage'] = 'yes'
inst.update(blank_state)
if inst['agent-class'] == HOSTMANAGER_CLASS:
if 'agent-class' in inst and inst['agent-class'] == HOSTMANAGER_CLASS:
sort_order = 0
hms.append(HostManagerManager(
args, site_config, instance_id=inst['instance-id']))
Expand Down Expand Up @@ -235,7 +235,7 @@ def get_status(args, site_config, restrict_hosts=None):
for cinfo in info['child_states']:
this_id = cinfo['instance_id']
# Watch for [d] suffix, and steal it.
if cinfo['agent_class'].endswith('[d]'):
if cinfo.get('agent_class') is not None and cinfo['agent_class'].endswith('[d]'):
agent_info[this_id]['agent-class'] = cinfo['agent_class']
if this_id in found:
output['warnings'].append(
Expand Down Expand Up @@ -282,23 +282,38 @@ def print_status(args, site_config):

for hstat in status['hosts']:
header = {'instance-id': '[instance-id]',
'agent-class': '[agent-class]',
'agent': '[agent]',
'current': '[state]',
'target': '[target]'}
field_widths = {'instance-id': 30,
'agent-class': 20}
field_widths = {'instance-id': 20,
'agent-class': 20,
'agent-exe': 20}
if len(hstat['agent_info']):
field_widths = {k: max(v0, max([len(v[k])
field_widths = {k: max(v0, max([len(v[k]) if k in v and v[k] is not None else 0
for v in hstat['agent_info'].values()]))
for k, v0 in field_widths.items()}
fmt = ' {instance-id:%i} {agent-class:%i} {current:>10} {target:>10}' % (
field_widths['instance-id'], field_widths['agent-class'])
field_widths['agent'] = max(field_widths['agent-class'], field_widths['agent-exe'])
del field_widths['agent-class']
del field_widths['agent-exe']
fmt = ' {instance-id:%i} {agent:%i} {current:>10} {target:>10}' % (
field_widths['instance-id'], field_widths['agent'])
header = fmt.format(**header)
print('-' * len(header))
print(f'Host: {hstat["host_name"]}\n')
print(header)
for v in hstat['agent_info'].values():
print(fmt.format(**v))

class FindAgentType(dict):
def __missing__(self, key):
if key == 'agent':
if 'agent-class' in self:
return self['agent-class']
if 'agent-exe' in self:
return self['agent-exe']
raise KeyError
hdata = sorted([FindAgentType(v) for v in hstat['agent_info'].values()],
key=lambda i: i['instance-id'])
for v in hdata:
print(fmt.format_map(v))
print()

if len(status['warnings']):
Expand Down Expand Up @@ -814,9 +829,9 @@ def main(args=None):
status = get_status(args, site_config)
for host_data in status['hosts']:
active_hms = [v for v in host_data['agent_info'].values()
if v['agent-class'] == HOSTMANAGER_CLASS]
if v.get('agent-class') == HOSTMANAGER_CLASS]
others = [v for v in host_data['agent_info'].values()
if v['agent-class'] != HOSTMANAGER_CLASS]
if v.get('agent-class') != HOSTMANAGER_CLASS]
for inst in active_hms:
if args.all or inst['instance-id'] in args.instance:
hms.append(inst)
Expand Down
Loading