Skip to content

Commit

Permalink
Merge branch “usermode”
Browse files Browse the repository at this point in the history
  • Loading branch information
augu5te committed Oct 23, 2024
2 parents d41ca47 + e0457e1 commit 0acd95f
Show file tree
Hide file tree
Showing 12 changed files with 1,517 additions and 57 deletions.
11 changes: 10 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
python-jose
passlib
bcrypt
tomli
];
};
packageName = "oar";
Expand Down Expand Up @@ -119,12 +120,20 @@
simpy
redis
clustershell
tomli
];
in
pkgs.mkShell {
packages = with pkgs; [ pre-commit ] ++ pythonEnv;
};

oarShell = let
pythonEnv = with pkgs.python3Packages; [
self.defaultPackage.${system}
];
in
pkgs.mkShell {
packages = with pkgs; [ litecli ] ++ pythonEnv;
};
};

});
Expand Down
113 changes: 113 additions & 0 deletions oar/cli/oar_usermode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import shutil
import sys

import click
from alembic.migration import MigrationContext
from alembic.operations import Operations
from ClusterShell.NodeSet import NodeSet
from sqlalchemy import Column, Integer
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import sessionmaker

import oar.tools as tools
from oar.lib.globals import init_oar
from oar.lib.models import DeferredReflectionModel, Model, Queue, Resource


def db_create(nodes, nb_core, skip):
config, engine = init_oar(no_reflect=True)

if config["USER_MODE"] == "NO":
print("Error: user mode not enabled,", file=sys.stderr)
print('hint: put USER_MODE="YES" in configuration file', file=sys.stderr)
exit(1)

# Model.metadata.drop_all(bind=engine)
kw = {"nullable": True}

Model.metadata.create_all(bind=engine)

conn = engine.connect()
context = MigrationContext.configure(conn)

try:
with context.begin_transaction():
op = Operations(context)
# op.execute("ALTER TYPE mood ADD VALUE 'soso'")
op.add_column("resources", Column("core", Integer, **kw))
op.add_column("resources", Column("cpu", Integer, **kw))
except ProgrammingError:
# if the columns already exist we continue
pass

session_factory = sessionmaker(bind=engine)

DeferredReflectionModel.prepare(engine)

with session_factory() as session:
Queue.create(
session,
name="default",
priority=0,
scheduler_policy="kamelot",
state="Active",
)
Queue.create(
session,
name="admin",
priority=100,
scheduler_policy="kamelot",
state="Active",
)

nodeset = NodeSet(nodes)
core = 1
if skip:
nodeset = nodeset[1:]
for node in nodeset:
for _ in range(nb_core):
Resource.create(session, network_address=node, core=core)
core += 1
session.commit()

# reflect_base(Model.metadata, DeferredReflectionModel, engine)
# DeferredReflectionModel.prepare(engine)
engine.dispose()


@click.command()
@click.option("-c", "--create-db", is_flag=True, help="Create database")
@click.option(
"-b",
"--base-configfile",
is_flag=True,
help="Copy base configuration file ('oar_usermode.conf')",
)
@click.option(
"-n",
"--nodes",
type=click.STRING,
help="nodes to declare in database following nodeset formate (ex: node[1,6-7])",
)
@click.option(
"-s",
"--skip",
is_flag=True,
help="skip the first node from nodes (usually reserved to OAR services",
)
@click.option(
"-o",
"--nb-core",
type=click.INT,
default=os.cpu_count(),
help="Number of cores for each node",
)
def cli(base_configfile, create_db, nodes, skip, nb_core):
if base_configfile:
oar_configfile = "oar_usermode.conf"
shutil.copyfile(tools.get_absolute_script_path(oar_configfile), oar_configfile)
if create_db:
if not nodes:
nodes = "node[1-5]"
db_create(nodes, nb_core, skip)
9 changes: 7 additions & 2 deletions oar/lib/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import os
import sys
from io import open
from typing import Any
Expand Down Expand Up @@ -125,6 +126,8 @@ class Configuration(dict):
"ENERGY_SAVING_NODE_MANAGER_WAKEUP_TIMEOUT": 900,
"ENERGY_MAX_CYCLES_UNTIL_REFRESH": 5000,
"ENERGY_SAVING_NODES_KEEPALIVE": "type='default':0",
# User level mode control
"USER_MODE": "NO",
}

def __init__(self, defaults=None):
Expand All @@ -133,7 +136,10 @@ def __init__(self, defaults=None):
dict.__init__(self, defaults)

def load_default_config(self, silent=True):
self.load_file(self.DEFAULT_CONFIG_FILE, silent=silent)
config_file = self.DEFAULT_CONFIG_FILE
if "OAR_CONFIG_FILE" in os.environ:
config_file = os.environ["OAR_CONFIG_FILE"]
self.load_file(config_file, silent=silent)

def load_file(
self,
Expand Down Expand Up @@ -208,7 +214,6 @@ def get_sqlalchemy_uri(self, read_only: bool = False): # pragma: no cover
raise InvalidConfiguration("Cannot find %s" % keys)

def setdefault_config(self, default_config: dict[str, Any]):
# import pdb; pdb.set_trace()
for k, v in default_config.items():
self.setdefault(k, v)

Expand Down
2 changes: 2 additions & 0 deletions oar/lib/job_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,7 @@ def job_finishing_sequence(session, config, epilogue_script, job_id, events):
("deploy" not in job_types.keys())
and ("cosystem" not in job_types.keys())
and ("noop" not in job_types.keys())
and (config["USER_MODE"] == "NO")
):
###############
# CPUSET PART #
Expand Down Expand Up @@ -2529,6 +2530,7 @@ def job_finishing_sequence(session, config, epilogue_script, job_id, events):
and (config["ACTIVATE_PINGCHECKER_AT_JOB_END"] == "yes")
and ("deploy" not in job_types.keys())
and ("noop" not in job_types.keys())
and (config["USER_MODE"] == "NO")
):
hosts = get_job_current_hostnames(job_id)
logger.debug(
Expand Down
14 changes: 4 additions & 10 deletions oar/lib/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def create_almighty_socket(server_hostname: str, server_port: str): # pragma: n
def notify_almighty(
cmd: str, job_id: Optional[int] = None, args: Optional[List[str]] = None
) -> bool: # pragma: no cover

if not almighty_socket:
create_almighty_socket(
config["SERVER_HOSTNAME"], config["APPENDICE_SERVER_PORT"]
Expand Down Expand Up @@ -203,17 +202,12 @@ def notify_interactif_user(job, message): # pragma: no cover
def notify_tcp_socket(addr, port, message): # pragma: no cover
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

tools_logger.debug("notify_tcp_socket:" + addr + ":" + port + ", msg:" + message)
tools_logger.debug(f"notify_tcp_socket: {addr}:{port}, msg: {message}")
try:
tcp_socket.connect((addr, int(port)))
except socket.error as exc:
tools_logger.error(
"notify_tcp_socket: Connection to "
+ addr
+ ":"
+ port
+ " raised exception socket.error: "
+ str(exc)
f"notify_tcp_socket: Connection to {addr}:{port} raised exception socket.error: {exc}"
)
return 0
message += "\n"
Expand Down Expand Up @@ -908,7 +902,7 @@ def resources2dump_perl(resources):


def get_oarexecuser_script_for_oarsub(
config, job, job_types, job_walltime, node_file, shell, resource_file
config, job, job_types, job_walltime, node_file, shell, resource_file
): # pragma: no cover
"""Create the shell script used to execute right command for the user
The resulting script can be launched with : bash -c 'script'
Expand All @@ -921,7 +915,7 @@ def get_oarexecuser_script_for_oarsub(

str_job_types = ""
for name_type, value_type in job_types.items():
if type(value_type) == bool:
if isinstance(value_type, bool):
value_type = str(int(value_type))
str_job_types += f"{name_type}={value_type},"
str_job_types = str_job_types[:-1]
Expand Down
78 changes: 41 additions & 37 deletions oar/modules/almighty.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,12 @@ def run(self, loop=True):
self.greta = start_greta()
# QGET
elif self.state == "Qget":
# if len(self.command_queue) > 0:
# self.read_commands(0)
# pass
# else:
self.read_commands(read_commands_timeout)
# Execute commands already in queue if any before read news ones
if self.command_queue == []:
self.read_commands(read_commands_timeout)

logger.debug("Command queue: " + str(self.command_queue))

logger.debug("Command queue : " + str(self.command_queue))
command = self.command_queue.pop(0)
# Remove useless 'Time' command to enhance reactivity
if command == "Time" and self.command_queue != []:
Expand Down Expand Up @@ -420,43 +419,48 @@ def run(self, loop=True):
self.lastscheduler + self.scheduler_min_time_between_2_calls
):
self.scheduler_wanted = 0
# We put nodeChangeState() / self.state = "Change node state" after scheduler
# to enhance reactivity, "Change node state" is done after schduling rounc
# First, check pending events
check_result = nodeChangeState()
if check_result == 2:
self.state = "Leon"
self.add_command("Term")
elif check_result == 1:
#
# check_result = nodeChangeState()
# if check_result == 2:
# self.state = "Leon"
# self.add_command("Term")
# elif check_result == 1:
# self.state = "Scheduler"
# elif check_result == 0:
# Launch the scheduler
# We check Greta just before starting the scheduler
# because if the pipe is not read, it may freeze oar
if (energy_pid > 0) and not check_greta(self.greta, logger):
logger.warning(
"Energy saving module (greta) died. Restarting it."
)
time.sleep(5)
start_greta()

scheduler_result = self.meta_scheduler()
self.lastscheduler = tools.get_time()
if scheduler_result == 0:
# Change node state moved here after Scheduling (TODO: to remove if extensive test)
# self.state = "Time update"
self.state = "Change node state"
elif scheduler_result == 1:
self.state = "Scheduler"
elif check_result == 0:
# Launch the scheduler
# We check Greta just before starting the scheduler
# because if the pipe is not read, it may freeze oar
if (energy_pid > 0) and not check_greta(self.greta, logger):
logger.warning(
"Energy saving module (greta) died. Restarting it."
)
time.sleep(5)
start_greta()

scheduler_result = self.meta_scheduler()
self.lastscheduler = tools.get_time()
if scheduler_result == 0:
self.state = "Time update"
elif scheduler_result == 1:
self.state = "Scheduler"
elif scheduler_result == 2:
self.state = "Leon"
else:
logger.error(
"Scheduler returned an unknown value : scheduler_result"
)
finishTag = 1

elif scheduler_result == 2:
self.state = "Leon"
else:
logger.error(
"nodeChangeState_command returned an unknown value."
"Scheduler returned an unknown value : scheduler_result"
)
finishTag = 1

# else:
# logger.error(
# "nodeChangeState_command returned an unknown value."
# )
# finishTag = 1
else:
self.scheduler_wanted = 1
self.state = "Time update"
Expand Down
Loading

0 comments on commit 0acd95f

Please sign in to comment.