diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 9f07095a..9e54eadc 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -9,6 +9,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' jobs: test: diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index f4cdc15b..c833815f 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -8,6 +8,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' workflow_call: jobs: diff --git a/.github/workflows/skipped-pytest.yml b/.github/workflows/skipped-pytest.yml index 400ca2d9..bd98b2db 100644 --- a/.github/workflows/skipped-pytest.yml +++ b/.github/workflows/skipped-pytest.yml @@ -8,6 +8,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' jobs: test: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b0b8c5c2..686b85f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,10 +7,10 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-autopep8 - rev: v2.0.1 + rev: v2.0.4 hooks: - id: autopep8 - repo: https://github.com/pycqa/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9a17f94d..85c668b8 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,10 +1,14 @@ version: 2 +build: + os: ubuntu-22.04 + tools: + python: "3.10" + sphinx: configuration: docs/conf.py python: - version: 3.7 install: - requirements: docs/requirements.txt - requirements: requirements.txt diff --git a/docker/crossbar/Dockerfile b/docker/crossbar/Dockerfile index b479604e..2adb8885 100644 --- a/docker/crossbar/Dockerfile +++ b/docker/crossbar/Dockerfile @@ -8,8 +8,11 @@ FROM crossbario/crossbar:cpy3-20.8.1 # Run as root to put config in place and chown USER root -# Copy in config and requirements files -COPY config.json /ocs/.crossbar/config.json +# Copy in config template and wrapper script +COPY run-crossbar.sh /ocs/run-crossbar.sh +RUN chmod a+x /ocs/run-crossbar.sh + +COPY config.json.template /ocs/.crossbar/config-with-address.json.template RUN chown -R crossbar:crossbar /ocs # Run image as crossbar user during normal operation @@ -20,4 +23,4 @@ EXPOSE 8001 # Run crossbar when the container launches # User made config.json should be mounted to /ocs/.crossbar/config.json -ENTRYPOINT ["crossbar", "start", "--cbdir", "/ocs/.crossbar"] +ENTRYPOINT ["/ocs/run-crossbar.sh"] diff --git a/docker/crossbar/config.json b/docker/crossbar/config.json.template similarity index 92% rename from docker/crossbar/config.json rename to docker/crossbar/config.json.template index 5f1519a0..c632c48c 100644 --- a/docker/crossbar/config.json +++ b/docker/crossbar/config.json.template @@ -10,13 +10,13 @@ }, "realms": [ { - "name": "test_realm", + "name": "{realm}", "roles": [ { "name": "iocs_agent", "permissions": [ { - "uri": "observatory.", + "uri": "{address_root}.", "match": "prefix", "allow": { "call": true, @@ -36,7 +36,7 @@ "name": "iocs_controller", "permissions": [ { - "uri": "observatory.", + "uri": "{address_root}.", "match": "prefix", "allow": { "call": true, @@ -60,7 +60,7 @@ "type": "web", "endpoint": { "type": "tcp", - "port": 8001 + "port": {port} }, "paths": { "ws": { @@ -81,7 +81,7 @@ }, "call": { "type": "caller", - "realm": "test_realm", + "realm": "{realm}", "role": "iocs_controller", "options": { } diff --git a/docker/crossbar/run-crossbar.sh b/docker/crossbar/run-crossbar.sh new file mode 100644 index 00000000..f074afc5 --- /dev/null +++ b/docker/crossbar/run-crossbar.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +CONFIG_DIR=/ocs/.crossbar +OCS_ADDRESS_ROOT=${OCS_ADDRESS_ROOT:-observatory} +OCS_CROSSBAR_REALM=${OCS_REALM:-test_realm} +OCS_CROSSBAR_PORT=${OCS_PORT:-8001} + +# Did user mount in a config.json? +if [ -e $CONFIG_DIR/config.json ]; then + echo Launching user-provided config.json + CONFIG_FILE=$CONFIG_DIR/config.json +else + pattern=" + s/{address_root}/${OCS_ADDRESS_ROOT}/g + s/{realm}/${OCS_CROSSBAR_REALM}/g + s/{port}/${OCS_CROSSBAR_PORT}/g + " + echo "Processing template with replacements:" + echo "$pattern" + echo + + CONFIG_FILE=$CONFIG_DIR/config-with-address.json + sed "$pattern" \ + $CONFIG_DIR/config-with-address.json.template \ + > $CONFIG_FILE +fi + +crossbar start --cbdir $CONFIG_DIR --config $CONFIG_FILE diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index 31e624b2..2e3c63d8 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -90,7 +90,9 @@ to register a feed so that it will be recorded by the aggregator. Unregistered providers will automatically be added when they send data, and stale providers will be removed if no data is received in a specified time period. -To do this, the aggregator monitors all feeds in the ``observatory`` namespace to find + +To do this, the aggregator monitors all feeds in the namespace defined +by the `{address_root}` prefix to find feeds that should be recorded. If the aggregator receives data from a feed registered with ``record=True``, it will automatically add that feed as a Provider, and will start putting incoming data into frames every ``frame_length`` diff --git a/docs/developer/writing_an_agent/docker.rst b/docs/developer/writing_an_agent/docker.rst index 2e4a7eea..e8d97656 100644 --- a/docs/developer/writing_an_agent/docker.rst +++ b/docs/developer/writing_an_agent/docker.rst @@ -84,7 +84,6 @@ the BarbonesAgent config to the ``ocs-docker`` host.: wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/docs/user/centralized_management.rst b/docs/user/centralized_management.rst index 97bf6a15..ad74fe66 100644 --- a/docs/user/centralized_management.rst +++ b/docs/user/centralized_management.rst @@ -476,6 +476,7 @@ should look something like this:: ExecStart=/home/ocs/git/ocs-site-configs/my-lab/launcher-hm-server5.sh User=ocs Restart=always + RestartSec=10s [Install] WantedBy=multi-user.target diff --git a/docs/user/crossbar_config.rst b/docs/user/crossbar_config.rst index dd1e5ac2..0a02a07a 100644 --- a/docs/user/crossbar_config.rst +++ b/docs/user/crossbar_config.rst @@ -12,48 +12,118 @@ the interface to the crossbar server. .. note:: - For most test deployments of OCS, you should not need to modify this file - and can use the one that comes with the ``simonsobs/ocs-crossbar`` Docker - Image. + For most simple lab deployments of OCS, you should not need to modify this + file and can use the one that comes with the ``simonsobs/ocs-crossbar`` + Docker Image. -Example Config --------------- -An example of the default OCS crossbar config that is bundled into -``simonsobs/ocs-crossbar`` can be found in the repository at -`ocs/docker/crossbar/config.json`_. This is based on the template in -`ocs/ocs/support/crossbar_config.json`_. +Configuration File Template +--------------------------- +The template that the default OCS crossbar config is built with is shown here: + +.. literalinclude:: ../../docker/crossbar/config.json.template -The unique parts of this to OCS are the realm name, "test_realm", defined -roles of "iocs_agent" and "iocs_controller, and "address_root" of -"observatory.". Additionally, we run on port 8001. +The variables `realm`, `address_root`, and `port` are all configurable and must +match configuration options set in your SCF. Keep reading this page to see how +to configure these variables. + +.. note:: + Changing the `address_root` has implications for the how your data is + stored and accessed in tools such as Grafana. It is recommended you pick + something reasonable when you first configure your system and do not change it + later. For further details on crossbar server configuration, see the crossbar `Router Configuration`_ page. -.. _`ocs/docker/crossbar/config.json`: https://github.com/simonsobs/ocs/blob/main/docker/crossbar/config.json -.. _`ocs/ocs/support/crossbar_config.json`: https://github.com/simonsobs/ocs/blob/main/ocs/support/crossbar_config.json .. _`Router Configuration`: https://crossbar.io/docs/Router-Configuration/ +Running with Docker +=================== + +We recommend running crossbar within a Docker container. We build the +``simonsobs/ocs-crossbar`` container from the official `crossbar.io Docker +image`_, specifically the cpy3 version. Bundled within the container is a +simple crossbar configuration file template with defaults that are +compatible with examples in this documentation. + +To adjust the crossbar configuration in the container, you can either: + +- Use environment variables to alter the most basic settings +- Generate and mount a new configuration file over + ``/ocs/.crossbar/config.json`` with the proper permissions + +.. _`crossbar.io Docker image`: https://hub.docker.com/r/crossbario/crossbar + +Environment variables in ocs-crossbar +------------------------------------- +The following environment variables can be set, to affect the +generation of the crossbar configuration file when the container +starts up: + +- OCS_ADDRESS_ROOT (default "observatory"): the base URI for OCS + entities (this needs to match the `address_root` set in the SCF). +- OCS_CROSSBAR_REALM (default "test_realm"): the WAMP realm to + configure for OCS. +- OCS_CROSSBAR_PORT (default 8001): the port on which crossbar will + accept requests. + +Here is an example of a docker-compose entry that overrides the +OCS_ADDRESS_ROOT:: + + crossbar: + image: simonsobs/ocs-crossbar:latest + ports: + - "127.0.0.1:8001:8001" # expose for OCS + environment: + - PYTHONUNBUFFERED=1 + - OCS_ADDRESS_ROOT=laboratory + +Bind Mounting the Configuration +------------------------------- +To instead mount a new configuration into the pre-built image, first chown +your file to be owned by user and group 242 (the default crossbar UID/GID), +then mount it appropriately in your docker-compose file. Here we assume you +put the configuration in the directory ``./dot_crossbar/``:: + + $ chown -R 242:242 dot_crossbar/ + +.. note:: + If you do not already have a configuration file to modify and use, see the + next section on generating one. + +Your docker-compose service should then be configured like:: + + crossbar: + image: simonsobs/ocs-crossbar + ports: + - "8001:8001" # expose for OCS + volumes: + - ./dot_crossbar:/ocs/.crossbar + environment: + - PYTHONUNBUFFERED=1 + Generating a New Config File ---------------------------- -``ocsbow`` can be used to generate a default configuration file, based on -options in your OSC file, which can then be modified if needed. +``ocs-local-support`` can be used to generate a default configuration +file, based on options in your SCF, which can then be modified if +needed. First, we make sure our ``OCS_CONFIG_DIR`` environment variable is set:: $ cd ocs-site-configs/ $ export OCS_CONFIG_DIR=`pwd` -We should make a directory for the crossbar config, let's call it -``dot_crossbar/`` (typically a dot directory, but for visibility we'll avoid -that):: +We should make a directory for the crossbar config, following along above let's +call it ``dot_crossbar/`` (typically a dot directory, but for visibility we'll +avoid that):: $ mkdir -p ocs-site-configs/dot_crossbar/ This directory needs to be configured as your crossbar 'config-dir' in your -ocs-site-config file. Now we can generate the config:: +ocs-site-config file. (See example in :ref:`site_config_user`.) Now we can +generate the config:: - $ ocsbow crossbar generate_config + $ ocs-local-support generate_crossbar_config The crossbar config-dir is set to: ./dot_crossbar/ Using @@ -68,54 +138,7 @@ modifications needed for your deployment. .. note:: - The crossbar 'config-dir' block and the 'agent-instance' block defining the - 'HostManager' Agent are both required for the system you are running ocsbow - on. Be sure to add these to your SCF if they do not exist. - -Running with Docker -=================== - -We recommend running crossbar within a Docker container. We build the -``simonsobs/ocs-crossbar`` container from the official `crossbar.io Docker -image`_, specifically the cpy3 version. Bundled within the container is a -simple OCS configuration that should work with the configuration -recommendations in this documentation. - -If changes need to be made, then you will need to generate your own -configuration file as described above. To use a modified configuration in the -container you can either: - -- Edit the default configuration file and rebuild the Docker image -- Mount the new configuration file over ``/ocs/.crossbar/config.json`` with the - proper permissions - -.. _`crossbar.io Docker image`: https://hub.docker.com/r/crossbario/crossbar - -Rebuilding the Docker Image ---------------------------- -To rebuild the Docker image after modifying ``ocs/docker/config.json`` run:: - - $ docker build -t ocs-crossbar . - -You should then update your configuration to use the new, local, -``ocs-crossbar`` image. - -Bind Mounting the Configuration -------------------------------- -To instead mount the new configuration into the pre-built image, first chown -your file to be owned by user and group 242 (the default crossbar UID/GID), -then mount it appropriately in your docker-compose file. Here we assume you -put the configuration in the directory ``./dot_crossbar/``:: - - $ chown -R 242:242 dot_crossbar/ - -Your docker-compose service should then be configured like:: - - crossbar: - image: simonsobs/ocs-crossbar - ports: - - "8001:8001" # expose for OCS - volumes: - - ./dot_crossbar:/ocs/.crossbar - environment: - - PYTHONUNBUFFERED=1 + The crossbar 'config-dir' block and the 'agent-instance' block + defining the 'HostManager' Agent are both required for the system + you are running `ocs-local-support` on. Be sure to add these to + your SCF if they do not exist. diff --git a/docs/user/docker_config.rst b/docs/user/docker_config.rst index 90d27853..9d0bb626 100644 --- a/docs/user/docker_config.rst +++ b/docs/user/docker_config.rst @@ -61,7 +61,7 @@ components):: ports: - "127.0.0.1:8001:8001" # expose for OCS environment: - - PYTHONUNBUFFERED=1 + - PYTHONUNBUFFERED=1 # -------------------------------------------------------------------------- # OCS Agents @@ -242,7 +242,7 @@ Where the separate compose files would look something like:: ports: - "127.0.0.1:8001:8001" # expose for OCS environment: - - PYTHONUNBUFFERED=1 + - PYTHONUNBUFFERED=1 :: diff --git a/docs/user/quickstart.rst b/docs/user/quickstart.rst index bf42ab5d..3886a0a9 100644 --- a/docs/user/quickstart.rst +++ b/docs/user/quickstart.rst @@ -48,7 +48,6 @@ structure. wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/docs/user/site_config.rst b/docs/user/site_config.rst index b6e0760c..a5023c8a 100644 --- a/docs/user/site_config.rst +++ b/docs/user/site_config.rst @@ -35,7 +35,6 @@ instances (running two different classes of agent): wamp_http: http://10.10.10.3:8001/call wamp_realm: test_realm address_root: observatory - registry_agent: observatory.registry hosts: @@ -116,6 +115,18 @@ The `hub` section defines the connection parameters for the crossbar server. This entire section will likely remain unchanged, except for the ``wamp_server`` and ``wamp_http`` IP addresses. +The `address_root` setting determines the leading token in all agent +and feed addresses on the crossbar network. While "observatory" is +the default, it can be changed as long as the crossbar configuration +is also updated to permit operations on the `{address_root}.` uri. + +.. warning:: + The hub settings must match the crossbar configuration. If you + change `wamp_realm` or `address_root`, especially, be sure to + update your crossbar configuration accordingly. (If using the + ocs-crossbar docker image, this can be done through environment + variables in the ``docker-compose.yaml`` file.) + Under `hosts` we have defined a three hosts, `host-1`, `host-1-docker`, and `host-2`. This configuration example shows a mix of Agents running directly on hosts and running within Docker containers. diff --git a/example/miniobs/default.yaml b/example/miniobs/default.yaml index 3b3d7a44..43710ee8 100644 --- a/example/miniobs/default.yaml +++ b/example/miniobs/default.yaml @@ -5,7 +5,6 @@ hub: wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/ocs/agent_cli.py b/ocs/agent_cli.py index e3e2b786..bcbe6816 100755 --- a/ocs/agent_cli.py +++ b/ocs/agent_cli.py @@ -1,6 +1,7 @@ import argparse import importlib import os +import setproctitle import sys import warnings @@ -182,6 +183,10 @@ def main(args=None): mod = importlib.import_module(_module) + title = f'ocs-agent:{instance.data["instance-id"]}' + print(f'Renaming this process to: "{title}"') + setproctitle.setproctitle(title) + start = getattr(mod, entrypoint) # This is the start function. start(args=post_args) diff --git a/ocs/agents/aggregator/agent.py b/ocs/agents/aggregator/agent.py index 06a1bb34..3f16cb45 100644 --- a/ocs/agents/aggregator/agent.py +++ b/ocs/agents/aggregator/agent.py @@ -57,7 +57,7 @@ def __init__(self, agent, args): # If this ends up being too much data, we can add a tag '.record' # at the end of the address of recorded feeds, and filter by that. self.agent.subscribe_on_start(self._enqueue_incoming_data, - 'observatory..feeds.', + f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 36507b9a..f5922e7e 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -41,8 +41,8 @@ def _get_local_instances(self): Returns: agent_dict (dict): Maps instance-id to a dict describing the agent config. The config is as contained in - HostConfig.instances but where 'instance-id', - 'agent-class', and 'manage' are all guaranteed populated + HostConfig.instances but where 'instance-id', 'agent-class' + or 'agent-exe', and 'manage' are all guaranteed populated (and manage is one of ['yes', 'no', 'docker']). warnings: A list of strings, each of which corresponds to some problem found in the config. @@ -54,6 +54,10 @@ def _get_local_instances(self): self.site_config_file = site.source_file self.host_name = hc.name self.working_dir = hc.working_dir + self.wamp_url = site.hub.data['wamp_server'] + self.wamp_realm = site.hub.data['wamp_realm'] + self.address_root = site.hub.data['address_root'] + self.log_dir = hc.log_dir # Scan for agent scripts in (deprecated) script registry for p in hc.agent_paths: @@ -72,7 +76,7 @@ def _get_local_instances(self): continue # Make sure 'manage' is set, and valid. default_manage = 'no' \ - if inst['agent-class'] == 'HostManager' else 'yes' + if 'agent-class' in inst and inst['agent-class'] == 'HostManager' else 'yes' inst['manage'] = inst.get('manage', default_manage) if inst['manage'] not in ['yes', 'no', 'docker']: warnings.append( @@ -80,6 +84,17 @@ def _get_local_instances(self): f'for instance_id={inst["instance-id"]}.') continue instances[inst['instance-id']] = inst + # Make sure either 'agent-class' or 'agent-exe' is set, but not both + if 'agent-class' not in inst and 'agent-exe' not in inst: + warnings.append( + f'Configuration problem, neither agent-class nor agent-exe is set' + f'for instance_id={inst["instance-id"]}.') + continue + if inst.get('agent-class') is not None and inst.get('agent-exe') is not None: + warnings.append( + f'Configuration problem, both agent-class and agent-exe are set' + f'for instance_id={inst["instance-id"]}.') + continue returnValue((instances, warnings)) yield @@ -169,23 +184,23 @@ def retire(db_key): docker_nonagents = list(self.docker_services.keys()) for iid, hinst in agent_dict.items(): - srv = self.docker_service_prefix + iid - cls = hinst['agent-class'] - mgmt = 'host' - if srv in docker_nonagents: - docker_nonagents.remove(srv) - cls += '[d]' - mgmt = 'docker' + record = dict(hinst) + record['srv'] = self.docker_service_prefix + iid + record['mgmt'] = 'host' + if record['srv'] in docker_nonagents: + docker_nonagents.remove(record['srv']) + record['agent-class'] += '[d]' + record['mgmt'] = 'docker' if hinst['manage'] != 'docker': session.add_message( f'The agent config for instance-id=' f'{iid} was matched to docker service ' - f'{srv}, but config does not specify ' + f'{record["srv"]}, but config does not specify ' f'manage:docker! Dropping both.') retire(iid) continue else: - srv = None + record['srv'] = None if hinst['manage'] == 'no': continue if hinst['manage'] == 'docker': @@ -195,36 +210,48 @@ def retire(db_key): f'in config. Dropping.') retire(iid) continue - new_managed.append((iid, iid, srv, cls, mgmt)) + record['db_key'] = iid + new_managed.append(record) for srv in docker_nonagents: - new_managed.append((srv, srv, srv, '[docker]', 'docker')) + new_managed.append({'db_key': srv, 'instance-id': srv, 'srv': srv, + 'agent-class': '[docker]', 'mgmt': 'docker'}) # Compare new managed items to stuff already in our database. - for db_key, iid, srv, cls, mgmt in new_managed: + for record in new_managed: + db_key = record['db_key'] instance = self.database.get(db_key, None) if instance is not None and \ instance['management'] == 'retired': instance = None if instance is not None: # So instance is some kind of actively managed container. - if (instance['agent_class'] != cls - or instance['management'] != mgmt): + if (instance['agent_class'] != record.get('agent-class') + or instance['agent_exe'] != record.get('agent-exe') + or instance['management'] != record.get('mgmt')): session.add_message( f'Managed agent "{db_key}" changed agent_class ' - f'({instance["agent_class"]} -> {cls}) or management ' - f'({instance["management"]} -> {mgmt}) and is being ' + f'({instance["agent_class"]} -> {record.get("agent-class")}) or agent_exe ' + f'({instance["agent_exe"]} -> {record.get("agent-exe")}) or management ' + f'({instance["management"]} -> {record.get("mgmt")}) and is being ' f'reset!') instance = None if instance is None: + if record.get("agent-class") is not None: + full_name = (f'{record["agent-class"]}:{record["db_key"]}') + else: + full_name = (f'{record["agent-exe"]}:{record["db_key"]}') instance = hm_utils.ManagedInstance.init( - management=mgmt, - instance_id=iid, - agent_class=cls, - full_name=(f'{cls}:{db_key}'), + management=record.get("mgmt"), + instance_id=record.get("instance-id"), + agent_class=record.get("agent-class"), + agent_exe=record.get("agent-exe"), + full_name=full_name, + agent_arguments=record.get("arguments"), + write_logs=record.get("write-logs", True) ) - if mgmt == 'docker': - instance['agent_script'] = srv + if record['mgmt'] == 'docker': + instance['agent_script'] = record['srv'] instance['prot'] = self._get_docker_helper(instance) if instance['prot'].status[0] is None: session.add_message( @@ -237,15 +264,17 @@ def retire(db_key): else: # Check for the agent class in the plugin system; # then check the (deprecated) agent script registry. - if cls in agent_plugins: - session.add_message(f'Found plugin for "{cls}"') + if record.get("agent-exe") is not None: + pass + elif record.get("agent-class") in agent_plugins: + session.add_message(f'Found plugin for "{record.get("agent-class")}"') instance['agent_script'] = '__plugin__' - elif cls in site_config.agent_script_reg: - session.add_message(f'Found launcher script for "{cls}"') - instance['agent_script'] = site_config.agent_script_reg[cls] + elif record.get("agent-class") in site_config.agent_script_reg: + session.add_message(f'Found launcher script for "{record.get("agent-class")}"') + instance['agent_script'] = site_config.agent_script_reg[record.get("agent-class")] else: session.add_message(f'No plugin (nor launcher script) ' - f'found for agent_class "{cls}"!') + f'found for agent_class "{record.get("agent-class")}"!') session.add_message(f'Tracking {instance["full_name"]}') self.database[db_key] = instance yield warnings @@ -303,18 +332,28 @@ def _launch_instance(self, instance): prot = self._get_docker_helper(instance) else: iid = instance['instance_id'] - pyth = sys.executable - script = instance['agent_script'] - if script == '__plugin__': - cmd = [pyth, '-m', 'ocs.agent_cli'] + if instance.get('agent_script') is not None: + pyth = sys.executable + script = instance['agent_script'] + if script == '__plugin__': + cmd = [pyth, '-m', 'ocs.agent_cli'] + else: + cmd = [pyth, script] + cmd.extend([ + '--instance-id', iid, + '--site-file', self.site_config_file, + '--site-host', self.host_name, + '--working-dir', self.working_dir]) + elif instance.get('agent_exe') is not None: + cmd = [instance['agent_exe'], '--address', self.address_root + '.' + iid, + '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] + if "agent_arguments" in instance: + cmd.extend(instance["agent_arguments"]) + if instance['write_logs'] and self.log_dir is not None: + log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log" else: - cmd = [pyth, script] - cmd.extend([ - '--instance-id', iid, - '--site-file', self.site_config_file, - '--site-host', self.host_name, - '--working-dir', self.working_dir]) - prot = hm_utils.AgentProcessHelper(iid, cmd) + log_file_path = None + prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) prot.up() instance['prot'] = prot @@ -601,9 +640,6 @@ def main(args=None): os.dup2(null, stream.fileno()) os.close(null) - # To reduce "try again" noise, don't tell Registry about HostManager. - args.registry_address = 'none' - agent, runner = ocs_agent.init_site_agent(args) docker_composes = [] diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index bc9d0ad1..47470e08 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -17,6 +17,8 @@ class ManagedInstance(dict): - 'agent_class' (str): The agent class. This will have special value 'docker' if the instance corresponds to a docker-compose service that has not been matched to a site_config entry. + - 'agent_exe' (str): The agent executable. This setting is mutually + exclusive with 'agent_class'. - 'instance_id' (str): The agent instance-id, or the docker service name if the instance is an unmatched docker-compose service. @@ -214,7 +216,7 @@ def stability_factor(times, window=120): class AgentProcessHelper(protocol.ProcessProtocol): - def __init__(self, instance_id, cmd): + def __init__(self, instance_id, cmd, log_file=None): super().__init__() self.status = None, None self.killed = False @@ -222,6 +224,7 @@ def __init__(self, instance_id, cmd): self.cmd = cmd self.lines = {'stderr': [], 'stdout': []} + self.log_file = open(log_file, "ab") if log_file is not None else None def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) @@ -260,13 +263,19 @@ def inConnectionLost(self): def processExited(self, status): # print('%s.status:' % self.instance_id, status) self.status = status, time.time() + if self.log_file is not None: + self.log_file.flush() def outReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stdout'].extend(data.decode('utf8').split('\n')) if len(self.lines['stdout']) > 100: self.lines['stdout'] = self.lines['stdout'][-100:] def errReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stderr'].extend(data.decode('utf8').split('\n')) if len(self.lines['stderr']) > 100: self.lines['stderr'] = self.lines['stderr'][-100:] diff --git a/ocs/agents/influxdb_publisher/agent.py b/ocs/agents/influxdb_publisher/agent.py index aa274f3b..e35ef00a 100644 --- a/ocs/agents/influxdb_publisher/agent.py +++ b/ocs/agents/influxdb_publisher/agent.py @@ -50,7 +50,7 @@ def __init__(self, agent, args): self.loop_time = 1 self.agent.subscribe_on_start(self._enqueue_incoming_data, - 'observatory..feeds.', + f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') @@ -97,6 +97,7 @@ def record(self, session: ocs_agent.OpSession, params): port=self.args.port, protocol=self.args.protocol, gzip=self.args.gzip, + operate_callback=lambda: self.aggregate, ) session.set_status('running') diff --git a/ocs/agents/influxdb_publisher/drivers.py b/ocs/agents/influxdb_publisher/drivers.py index 7f09cba8..23fdc889 100644 --- a/ocs/agents/influxdb_publisher/drivers.py +++ b/ocs/agents/influxdb_publisher/drivers.py @@ -51,6 +51,9 @@ class Publisher: Protocol for writing data. Either 'line' or 'json'. gzip (bool, optional): compress influxdb requsts with gzip + operate_callback (callable, optional): + Function to call to see if failed connections should be + retried (to prevent a thread from locking). Attributes: host (str): @@ -66,7 +69,8 @@ class Publisher: """ - def __init__(self, host, database, incoming_data, port=8086, protocol='line', gzip=False): + def __init__(self, host, database, incoming_data, port=8086, protocol='line', + gzip=False, operate_callback=None): self.host = host self.port = port self.db = database @@ -88,6 +92,9 @@ def __init__(self, host, database, incoming_data, port=8086, protocol='line', gz LOG.error("Connection error, attempting to reconnect to DB.") self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip) time.sleep(1) + if operate_callback and not operate_callback(): + break + db_names = [x['name'] for x in db_list] if self.db not in db_names: diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index fc409c76..f53d644d 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -98,7 +98,7 @@ def __init__(self, agent, args): self.agent_timeout = 5.0 # Removes agent after 5 seconds of no heartbeat. self.agent.subscribe_on_start( - self._register_heartbeat, 'observatory..feeds.heartbeat', + self._register_heartbeat, f'{args.address_root}..feeds.heartbeat', options={'match': 'wildcard'} ) @@ -132,9 +132,6 @@ def _publish_agent_ops(self, reg_agent): """ addr = reg_agent.agent_address - msg = {'block_name': addr, - 'timestamp': time.time(), - 'data': {}} self.log.debug(addr) for op_name, op_code in reg_agent.op_codes.items(): field = f'{addr}_{op_name}' @@ -146,8 +143,9 @@ def _publish_agent_ops(self, reg_agent): except ValueError as e: self.log.warn(f"Improper field name: {field}\n{e}") continue - msg['data'][field] = op_code - if msg['data']: + msg = {'block_name': field, + 'timestamp': time.time(), + 'data': {field: op_code}} self.agent.publish_to_feed('agent_operations', msg) @ocs_agent.param('test_mode', default=False, type=bool) diff --git a/ocs/checkdata.py b/ocs/checkdata.py index f3d21f03..b589f284 100644 --- a/ocs/checkdata.py +++ b/ocs/checkdata.py @@ -108,11 +108,11 @@ def _populate_instances(self): {FEED1: {"fields": {FIELD1: - {'full_name': 'observatory.INSTANCE_ID.feeds.FEED.FIELD', + {'full_name': 'ADDRESS_ROOT.INSTANCE_ID.feeds.FEED.FIELD', 't_last': float, 'v_last': float}, FIELD2: - {'full_name': 'observatory.INSTANCE_ID.feeds.FEED.FIELD', + {'full_name': 'ADDRESS_ROOT.INSTANCE_ID.feeds.FEED.FIELD', 't_last': float, 'v_last': float} }, diff --git a/ocs/client_cli.py b/ocs/client_cli.py index 7065f622..35b73724 100755 --- a/ocs/client_cli.py +++ b/ocs/client_cli.py @@ -62,7 +62,8 @@ def get_parser(): # scan p = client_sp.add_parser('scan', help="Gather and print list of Agents.") p.add_argument('--details', action='store_true', help="List all Operations with their current status OpCode.") - p.add_argument('--use-registry', action='store_true', help="Query the registry (faster than listening for heartbeats).") + p.add_argument('--use-registry', nargs='?', const='registry', help="Query the registry (faster than listening for heartbeats). " + "Pass the registry instance_id as an argument (default to 'registry').") # scan p = client_sp.add_parser('listen', help="Subscribe to feed(s) and dump to stdout.") @@ -122,9 +123,7 @@ def scan(parser, args): parser.error('Unable to find the OCS config; set OCS_CONFIG_DIR?') if args.use_registry: - reg_addr = args.registry_address - if reg_addr is None: - reg_addr = 'registry' + reg_addr = f'{args.address_root}.{args.use_registry}' try: c = OCSClient(get_instance_id(reg_addr, args), args=args) except RuntimeError as e: diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 97aa97d0..1efc9db1 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -4,7 +4,7 @@ txaio.use_twisted() from twisted.internet import reactor, task, threads -from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList, FirstError +from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList, FirstError, maybeDeferred from twisted.internet.error import ReactorNotRunning from twisted.python import log @@ -171,9 +171,16 @@ def onJoin(self, details): try: yield self.register(self._ops_handler, self.agent_address + '.ops') yield self.register(self._management_handler, self.agent_address) - except ApplicationError: - self.log.error('Failed to register basic handlers @ %s; ' - 'agent probably running already.' % self.agent_address) + except ApplicationError as e: + self.log.error('Failed to register basic handlers! ' + 'Error: {error}', error=e) + if e.error == 'wamp.error.not_authorized': + self.log.error('Are the WAMP realm and OCS address_root consistent ' + 'in OCS site config and crossbar config.json?') + elif e.error == 'wamp.error.procedure_already_exists': + self.log.error('Is this agent already running? ' + 'agent_address="{agent_address}"', + agent_address=self.agent_address) self.leave() return @@ -196,8 +203,13 @@ def heartbeat(): self.heartbeat_call.start(1.0) # Calls the hearbeat every second # Subscribe to startup_subs + def _subscribe_fail(*args, **kwargs): + self.log.error('Failed to subscribe to a feed or feed pattern; possible configuration problem.') + self.log.error(str(args) + str(kwargs)) + self.leave() + for sub in self.startup_subs: - self.subscribe(**sub) + maybeDeferred(self.subscribe, **sub).addErrback(_subscribe_fail) # Now do the startup activities, only the first time we join if self.first_time_startup: @@ -682,8 +694,10 @@ def start(self, op_name, params=None): handler = ParamHandler(params) params = handler.batch(op.launcher._ocs_prescreen) except ParamError as err: + self.log.error("Caught ParamError during start call: {err}", err=err) return (ocs.ERROR, err.msg, {}) except Exception as err: + self.log.error("Caught Exception during start call: {err}", err=err) return (ocs.ERROR, f'CRASH: during param pre-processing: {str(err)}', {}) # Mark as started. @@ -1290,6 +1304,8 @@ def my_task(self, session, params): """ def __init__(self, params): + if params is None: + params = {} self._params = params self._checked = set() @@ -1377,6 +1393,9 @@ def get(self, key, default=ParamError(''), check=None, cast=None, type=None, # Free cast from int to float. if type is float and isinstance(value, int): value = float(value) + # Fix type after json conversion + if type is tuple and isinstance(value, list) and cast in [tuple, None]: + value = tuple(value) if not isinstance(value, type): raise ParamError(f"Param '{key}'={value} is not of required type ({type})") if choices is not None: diff --git a/ocs/ocs_systemd.py b/ocs/ocs_systemd.py index dbfc58c9..b22b50d3 100755 --- a/ocs/ocs_systemd.py +++ b/ocs/ocs_systemd.py @@ -28,6 +28,7 @@ ExecStart={cmd} User={service_user} Restart=always +RestartSec=10s {environment_lines} [Install] diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index 2915099f..a3c82933 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -156,16 +156,32 @@ def crossbar_test(args, site_config): '%s._crossbar_check_' % site.hub.data['address_root'], url=site.hub.data['wamp_http'], realm=site.hub.data['wamp_realm']) try: + # This is not expected to succeed, but the different errors + # tell us different things... client.call(client.agent_addr) except client_http.ControlClientError as ccex: suberr = ccex.args[0][4] - if suberr == 'client_http.error.connection_error': - ok, msg = False, 'http bridge not found at {wamp_http}.' - elif suberr == 'wamp.error.no_such_procedure': - ok, msg = True, 'http bridge reached at {wamp_http}.' + if suberr == 'wamp.error.no_such_procedure': + # This indicates we got through to the bridge, it liked + # the realm and our address_root. Return True. + ok, msg = True, 'http bridge reached at {wamp_http}.'.format(**site.hub.data) + elif suberr == 'client_http.error.connection_error': + # Possibly crossbar server is not running. + ok, msg = False, 'http bridge not found at {wamp_http}.'.format(**site.hub.data) + elif suberr == 'wamp.error.not_authorized': + # This is likely a configuration issue, print a banner and reraise it. + print('***** crossbar / ocs configuration mismatch *****') + print('The exception here indicates a likely configuration mismatch issue') + print('with the crossbar server and OCS. Specifically, the WAMP realm and') + print('the OCS address_root must match between the site config file') + print('and the crossbar config.') + print('*****\n') + raise ccex else: - ok, msg = True, 'unexpected bridge connection problem; raised %s' % (str(ccex)) - return ok, msg.format(**site.hub.data) + # I think this case hasn't been encountered much. + print('***** unhandled error case *****\n') + raise ccex + return ok, msg def get_status(args, site_config, restrict_hosts=None): @@ -200,7 +216,7 @@ def get_status(args, site_config, restrict_hosts=None): if inst.get('manage') is None: inst['manage'] = 'yes' inst.update(blank_state) - if inst['agent-class'] == HOSTMANAGER_CLASS: + if 'agent-class' in inst and inst['agent-class'] == HOSTMANAGER_CLASS: sort_order = 0 hms.append(HostManagerManager( args, site_config, instance_id=inst['instance-id'])) @@ -235,7 +251,7 @@ def get_status(args, site_config, restrict_hosts=None): for cinfo in info['child_states']: this_id = cinfo['instance_id'] # Watch for [d] suffix, and steal it. - if cinfo['agent_class'].endswith('[d]'): + if cinfo.get('agent_class') is not None and cinfo['agent_class'].endswith('[d]'): agent_info[this_id]['agent-class'] = cinfo['agent_class'] if this_id in found: output['warnings'].append( @@ -282,23 +298,38 @@ def print_status(args, site_config): for hstat in status['hosts']: header = {'instance-id': '[instance-id]', - 'agent-class': '[agent-class]', + 'agent': '[agent]', 'current': '[state]', 'target': '[target]'} - field_widths = {'instance-id': 30, - 'agent-class': 20} + field_widths = {'instance-id': 20, + 'agent-class': 20, + 'agent-exe': 20} if len(hstat['agent_info']): - field_widths = {k: max(v0, max([len(v[k]) + field_widths = {k: max(v0, max([len(v[k]) if k in v and v[k] is not None else 0 for v in hstat['agent_info'].values()])) for k, v0 in field_widths.items()} - fmt = ' {instance-id:%i} {agent-class:%i} {current:>10} {target:>10}' % ( - field_widths['instance-id'], field_widths['agent-class']) + field_widths['agent'] = max(field_widths['agent-class'], field_widths['agent-exe']) + del field_widths['agent-class'] + del field_widths['agent-exe'] + fmt = ' {instance-id:%i} {agent:%i} {current:>10} {target:>10}' % ( + field_widths['instance-id'], field_widths['agent']) header = fmt.format(**header) print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') print(header) - for v in hstat['agent_info'].values(): - print(fmt.format(**v)) + + class FindAgentType(dict): + def __missing__(self, key): + if key == 'agent': + if 'agent-class' in self: + return self['agent-class'] + if 'agent-exe' in self: + return self['agent-exe'] + raise KeyError + hdata = sorted([FindAgentType(v) for v in hstat['agent_info'].values()], + key=lambda i: i['instance-id']) + for v in hdata: + print(fmt.format_map(v)) print() if len(status['warnings']): @@ -399,9 +430,11 @@ def generate_crossbar_config(cm, site_config): print(line, end='') print('\n') print('To adopt the new config, remove %s and re-run me.' % cb_filename) + print() else: open(cb_filename, 'w').write(config_text) print('Wrote %s' % cb_filename) + print() class CrossbarManager: @@ -789,6 +822,15 @@ def update(self): 'running, but should start if you run "ocs-local-support start".')) self.analysis = solutions + def fail_on_missing_crossbar_config(self): + """Check if crossbar is managed by this config. If not, print + error message and exit(1).""" + if not self.crossbar['manage']: + print('Error! Crossbar config file not set.\n\n' + 'To start crossbar or to generate a config file, the site ' + 'config file must have a "crossbar" entry; see docs.\n') + sys.exit(1) + def main(args=None): args, site_config = get_args_and_site_config(args) @@ -814,9 +856,9 @@ def main(args=None): status = get_status(args, site_config) for host_data in status['hosts']: active_hms = [v for v in host_data['agent_info'].values() - if v['agent-class'] == HOSTMANAGER_CLASS] + if v.get('agent-class') == HOSTMANAGER_CLASS] others = [v for v in host_data['agent_info'].values() - if v['agent-class'] != HOSTMANAGER_CLASS] + if v.get('agent-class') != HOSTMANAGER_CLASS] for inst in active_hms: if args.all or inst['instance-id'] in args.instance: hms.append(inst) @@ -953,8 +995,10 @@ def eligible(subsys): print('Trouble!') for text in fatals: print(_term_format(text, ' ', 4)) + sys.exit(1) if any([soln == 'crossbar' for soln, text in supports.analysis]): + supports.fail_on_missing_crossbar_config() print('Trying to start crossbar...') supports.crossbar['ctrl'].action('start', foreground=args.foreground) supports.update() # refresh .analysis @@ -995,5 +1039,6 @@ def eligible(subsys): print('No running crossbar detected, system is already "down".') elif action == 'generate_crossbar_config': + supports.fail_on_missing_crossbar_config() cm = supports.crossbar['ctrl'] generate_crossbar_config(cm, site_config) diff --git a/ocs/site_config.py b/ocs/site_config.py index 0315c207..ab53354a 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -187,12 +187,8 @@ def from_dict(cls, data, parent=None): ``address_root`` (required): The base address to be used by all OCS Agents. This is normally something simple like - ``observatory`` or ``detlab.system1``. (Command line - override: ``--address-root``.) - - ``registry_address`` (optional): The address of the OCS Registry - Agent. See :ref:`registry`. (Command line override: - ``--registry-address``.) + ``observatory`` or ``detlab``. (Command line override: + ``--address-root``.) """ self = cls() @@ -229,6 +225,12 @@ def from_dict(cls, data, parent=None): the Agent instance, as a way of finding the right InstanceConfig. + ``agent-exe`` (str, optional) + The Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. + ``arguments`` (list, optional): A list of arguments that should be passed back to the agent. Historically the arguments have been grouped into @@ -250,6 +252,10 @@ def from_dict(cls, data, parent=None): self.manage = self.data.get('manage') if self.manage is None: self.manage = "yes" + if 'agent-class' not in self.data: + self.data['agent-class'] = None + if 'agent-exe' not in self.data: + self.data['agent-exe'] = None return self @@ -385,13 +391,13 @@ def add_arguments(parser=None): group.add_argument('--instance-id', help="""Look in the SCF for Agent-instance specific configuration options, and use those to launch the Agent.""") group.add_argument('--address-root', help="""Override the site default address root.""") - group.add_argument('--registry-address', help="""Override the site default registry address.""") + group.add_argument('--registry-address', help="""Deprecated.""") group.add_argument('--log-dir', help="""Set the logging directory.""") group.add_argument('--working-dir', help="""Propagate the working directory.""") return parser -def get_config(args, agent_class=None): +def get_config(args, agent_class=None, agent_exe=None): """ Args: args: The argument object returned by @@ -400,13 +406,15 @@ def get_config(args, agent_class=None): in this object. agent_class: Class name passed in to match against the list of device classes in each host's list. + agent_exe: Executable path or name passed in to match against the list of + agent executables in each host's list. Special values accepted for agent_class: - '*control*': do not insist on matching host or device. - '*host*': do not insist on matching device (but do match host). Returns: - The tuple (site_config, host_config, device_config). + The tuple (site_config, host_config, instance_config). """ if args.site == 'none': return (None, None, None) @@ -466,9 +474,6 @@ def get_config(args, agent_class=None): if args.site_realm is not None: site_config.hub.data['wamp_realm'] = args.site_realm - if args.registry_address is not None: - site_config.hub.data['registry_address'] = args.registry_address - # Identify our agent-instance. instance_config = None if no_dev_match: @@ -480,16 +485,26 @@ def get_config(args, agent_class=None): instance_config = InstanceConfig.from_dict( dev, parent=host_config) break - else: + elif agent_class is not None: # Use the agent_class to figure it out... for dev in host_config.instances: - if dev['agent-class'] == agent_class: + if 'agent-class' in dev and dev['agent-class'] == agent_class: if instance_config is not None: raise RuntimeError( f"Multiple matches found for agent-class={agent_class}" " ... you probably need to pass --instance-id=") instance_config = InstanceConfig.from_dict( dev, parent=host_config) + elif agent_exe is not None: + # Use the agent_exe to figure it out... + for dev in host_config.instances: + if 'agent-exe' in dev and dev['agent-exe'] == agent_exe: + if instance_config is not None: + raise RuntimeError( + f"Multiple matches found for agent-exe={agent_exe}" + " ... you probably need to pass --instance-id=") + instance_config = InstanceConfig.from_dict( + dev, parent=host_config) if instance_config is None and not no_dev_match: raise RuntimeError("Could not find matching device description.") return collections.namedtuple('SiteConfig', ['site', 'host', 'instance'])(site_config, host_config, instance_config) @@ -515,8 +530,6 @@ def add_site_attributes(args, site, host=None): args.site_realm = site.hub.data['wamp_realm'] if args.address_root is None: args.address_root = site.hub.data['address_root'] - if args.registry_address is None: - args.registry_address = site.hub.data.get('registry_address') if (args.log_dir is None) and (host is not None): args.log_dir = host.log_dir @@ -657,7 +670,7 @@ def scan_for_agents(do_registration=True): return items -def parse_args(agent_class=None, parser=None, args=None): +def parse_args(agent_class=None, agent_exe=None, parser=None, args=None): """ Function to parse site-config and agent arguments. This function takes site, host, and instance arguments into account by making sure the instance @@ -672,6 +685,11 @@ def parse_args(agent_class=None, parser=None, args=None): may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig. + agent_exe (str, optional): + Name of the Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. parser (argparse.ArgumentParser, optional): Argument parser containing agent-specific arguments. If None, an empty parser will be created. @@ -705,7 +723,7 @@ def parse_args(agent_class=None, parser=None, args=None): pre_args, _ = pre_parser.parse_known_args(args=args) - site, host, instance = get_config(pre_args, agent_class=agent_class) + site, host, instance = get_config(pre_args, agent_class=agent_class, agent_exe=agent_exe) if instance is not None: # When the user omits instance_id, it can still be matched, @@ -743,7 +761,9 @@ def flatten(container): add_site_attributes(args, site, host=host) # Add agent_class attribute. - if not hasattr(args, 'agent_class'): + if not hasattr(args, 'agent_class') and agent_class is not None: setattr(args, 'agent_class', agent_class) + if not hasattr(args, 'agent_exe') and agent_exe is not None: + setattr(args, 'agent_exe', agent_exe) return args diff --git a/requirements.txt b/requirements.txt index aae6bca3..09aecc20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ twisted deprecation PyYAML importlib_metadata +setproctitle # InfluxDB Publisher influxdb diff --git a/requirements/testing.txt b/requirements/testing.txt index 4aa6e848..62b3147f 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,6 +1,6 @@ pytest pytest-cov docker -pytest-docker-compose +pytest-docker pytest-twisted so3g diff --git a/setup.py b/setup.py index b1b553ac..95eea52b 100644 --- a/setup.py +++ b/setup.py @@ -5,6 +5,10 @@ with open("README.rst", "r", encoding="utf-8") as fh: long_description = fh.read() +so3g_etxras = ["so3g"] +dev_extras = ["pytest", "pytest-twisted", "pytest-docker-compose", "pytest-cov", "coverage", "docker"] +dev_extras.extend(so3g_etxras) + setup(name='ocs', description='Observatory Control System', long_description=long_description, @@ -48,8 +52,11 @@ 'PyYAML', 'influxdb', 'numpy', + 'importlib_metadata;python_version<"3.10"', + 'setproctitle', ], extras_require={ - "so3g": ["so3g"], + "so3g": so3g_etxras, + "dev": dev_extras, }, ) diff --git a/tests/agents/test_registry_agent.py b/tests/agents/test_registry_agent.py index 06c7f01c..6eb0339f 100644 --- a/tests/agents/test_registry_agent.py +++ b/tests/agents/test_registry_agent.py @@ -1,12 +1,13 @@ import time -import pytest import pytest_twisted from agents.util import create_session, create_agent_fixture from ocs.agents.registry.agent import RegisteredAgent, Registry, make_parser +from ocs import site_config parser = make_parser() +site_config.add_arguments(parser) args = parser.parse_args(['--wait-time', '0.1']) agent = create_agent_fixture(Registry, agent_kwargs=dict(args=args)) @@ -120,5 +121,4 @@ def test_registry_handles_changed_agent_ops(agent): # Add a new op to the registered agent reg_agent.op_codes = {'op_name': 1, 'new_op': 1} - with pytest.raises(Exception): - agent._publish_agent_ops(reg_agent) + agent._publish_agent_ops(reg_agent) diff --git a/tests/default.yaml b/tests/default.yaml index 4207c592..449ac234 100644 --- a/tests/default.yaml +++ b/tests/default.yaml @@ -5,7 +5,6 @@ hub: wamp_http: http://127.0.0.1:18001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/tests/integration/test_aggregator_agent_integration.py b/tests/integration/test_aggregator_agent_integration.py index 02dcb9bb..b0044943 100644 --- a/tests/integration/test_aggregator_agent_integration.py +++ b/tests/integration/test_aggregator_agent_integration.py @@ -11,8 +11,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( diff --git a/tests/integration/test_crossbar_integration.py b/tests/integration/test_crossbar_integration.py index 7f2f044a..382c0cbe 100644 --- a/tests/integration/test_crossbar_integration.py +++ b/tests/integration/test_crossbar_integration.py @@ -8,9 +8,7 @@ from so3g import hk from integration.util import create_crossbar_fixture, restart_crossbar - -pytest_plugins = ("docker_compose",) - +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() @@ -177,3 +175,6 @@ def test_proper_agent_shutdown_on_lost_transport(wait_for_crossbar): fake_data_container = client.containers.get('ocs-tests-fake-data-agent') assert fake_data_container.status == "exited" + + # Restart crossbar, else docker plugin loses track of it + crossbar_container.start() diff --git a/tests/integration/test_fake_data_agent_integration.py b/tests/integration/test_fake_data_agent_integration.py index 70aec7c4..aab999ac 100644 --- a/tests/integration/test_fake_data_agent_integration.py +++ b/tests/integration/test_fake_data_agent_integration.py @@ -11,11 +11,10 @@ from integration.util import ( create_crossbar_fixture, ) +from integration.util import docker_compose_file # noqa: F401 AGENT_PATH = '../ocs/agents/fake_data/agent.py' -pytest_plugins = ("docker_compose") - wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( AGENT_PATH, 'fake_data') diff --git a/tests/integration/test_host_manager_agent_integration.py b/tests/integration/test_host_manager_agent_integration.py index c500decd..983181e2 100644 --- a/tests/integration/test_host_manager_agent_integration.py +++ b/tests/integration/test_host_manager_agent_integration.py @@ -14,9 +14,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") - +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture('../ocs/agents/host_manager/agent.py', diff --git a/tests/integration/test_influxdb_publisher_integration.py b/tests/integration/test_influxdb_publisher_integration.py index faeeee05..a10af963 100644 --- a/tests/integration/test_influxdb_publisher_integration.py +++ b/tests/integration/test_influxdb_publisher_integration.py @@ -11,8 +11,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( diff --git a/tests/integration/test_registry_agent_integration.py b/tests/integration/test_registry_agent_integration.py index ccb9f2d4..ccab10ef 100644 --- a/tests/integration/test_registry_agent_integration.py +++ b/tests/integration/test_registry_agent_integration.py @@ -9,11 +9,10 @@ from integration.util import ( create_crossbar_fixture ) +from integration.util import docker_compose_file # noqa: F401 from ocs.base import OpCode -pytest_plugins = ("docker_compose") - wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture('../ocs/agents/registry/agent.py', 'registry', diff --git a/tests/integration/util.py b/tests/integration/util.py index 07b853ce..fc847158 100644 --- a/tests/integration/util.py +++ b/tests/integration/util.py @@ -1,3 +1,5 @@ +import os + import pytest import docker @@ -7,13 +9,10 @@ def create_crossbar_fixture(): # Fixture to wait for crossbar server to be available. # Speeds up tests a bit to have this session scoped - # If tests start interfering with one another this should be changed to - # "function" scoped and session_scoped_container_getter should be changed - # to function_scoped_container_getter - # @pytest.fixture(scope="session") - # def wait_for_crossbar(session_scoped_container_getter): - @pytest.fixture(scope="function") - def wait_for_crossbar(function_scoped_container_getter): + # If tests interfere with eachother change to "function" scoped + # @pytest.fixture(scope="function") + @pytest.fixture(scope="session") + def wait_for_crossbar(docker_services): """Wait for the crossbar server from docker-compose to become responsive. @@ -29,3 +28,10 @@ def restart_crossbar(): crossbar_container = client.containers.get('ocs-tests-crossbar') crossbar_container.restart() check_crossbar_connection() + + +# Overrides the default location that pytest-docker looks for the compose file. +# https://pypi.org/project/pytest-docker/ +@pytest.fixture(scope="session") +def docker_compose_file(pytestconfig): + return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml") diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index f874f58d..8fe52fa9 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -424,7 +424,6 @@ def test_make_filename_directory_creation_no_subdirs(tmpdir): make_filename(test_dir, make_subdirs=False) -@patch('os.makedirs', side_effect=PermissionError('mocked permission error')) def test_make_filename_directory_creation_permissions(tmpdir): """make_filename() should raise a PermissionError if it runs into one when making the directories. @@ -433,6 +432,7 @@ def test_make_filename_directory_creation_permissions(tmpdir): """ test_dir = os.path.join(tmpdir, 'data') - with pytest.raises(PermissionError) as e_info: - make_filename(test_dir) - assert str(e_info.value) == 'mocked permission error' + with patch('os.makedirs', side_effect=PermissionError('mocked permission error')): + with pytest.raises(PermissionError) as e_info: + make_filename(test_dir) + assert str(e_info.value) == 'mocked permission error' diff --git a/tests/test_ocs_agent.py b/tests/test_ocs_agent.py index d300276c..53cc3d21 100644 --- a/tests/test_ocs_agent.py +++ b/tests/test_ocs_agent.py @@ -43,6 +43,12 @@ def tfunc_raise(session, a): return tfunc(session, a) +@param('test', default=1) +def tfunc_param_dec(session, a): + """tfunc but decorated with @ocs_agent.param.""" + return True, 'Task completed successfully' + + @pytest.fixture def mock_agent(): """Test fixture to setup a mocked OCSAgent. @@ -50,6 +56,7 @@ def mock_agent(): """ mock_config = MagicMock() mock_site_args = MagicMock() + mock_site_args.working_dir = "./" mock_site_args.log_dir = "./" a = OCSAgent(mock_config, mock_site_args, address='test.address') return a @@ -260,6 +267,19 @@ def test_start_unregistered_task(mock_agent): assert res[2] == {} +def test_start_task_none_params(mock_agent): + """Test passing params=None to task decorated with @param that has set + defaults. + + See issue: https://github.com/simonsobs/ocs/issues/251 + + """ + mock_agent.register_task('test_task', tfunc_param_dec) + res = mock_agent.start('test_task', params=None) + print(res) + assert res[0] == ocs.OK + + # Wait @pytest_twisted.inlineCallbacks def test_wait(mock_agent): @@ -502,12 +522,14 @@ def test_params_get(): 'float_param': 1e8, 'numerical_string_param': '145.12', 'none_param': None, + 'tuple_param': [20., 120.], # gets cast to list by json conversion }) # Basic successes params.get('int_param', type=int) params.get('string_param', type=str) params.get('float_param', type=float) + params.get('tuple_param', type=tuple) # Tricky successes params.get('int_param', type=float) diff --git a/tests/test_site_config.py b/tests/test_site_config.py index 0100374e..a2ed359d 100644 --- a/tests/test_site_config.py +++ b/tests/test_site_config.py @@ -28,8 +28,7 @@ def test_none_client_type_w_wamp_http_site(self): mock_site = MagicMock() mock_site.hub.data = {'wamp_http': 'http://127.0.0.1:8001', 'wamp_realm': 'test_realm', - 'address_root': 'observatory', - 'registry_address': 'observatory.registry'} + 'address_root': 'observatory'} get_control_client('test', site=mock_site, client_type=None) @@ -40,8 +39,7 @@ def test_none_client_type_wo_wamp_http_site(self): """ mock_site = MagicMock() mock_site.hub.data = {'wamp_realm': 'test_realm', - 'address_root': 'observatory', - 'registry_address': 'observatory.registry'} + 'address_root': 'observatory'} with pytest.raises(ValueError): get_control_client('test', site=mock_site, client_type=None)