Skip to content

Commit

Permalink
Add ignore label support
Browse files Browse the repository at this point in the history
  • Loading branch information
jkupferer committed Jul 16, 2024
1 parent da99893 commit cc17877
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 22 deletions.
1 change: 1 addition & 0 deletions operator/anarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Anarchy():
event_label = f"{domain}/event"
finished_label = f"{domain}/finished"
governor_label = f"{domain}/governor"
ignore_label = f"{domain}/ignore"
runner_label = f"{domain}/runner"
runner_terminating_label = f"{domain}/runner-terminating"
spec_sha256_annotation = f"{domain}/spec-sha256"
Expand Down
4 changes: 3 additions & 1 deletion operator/anarchycachedkopfobject.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from anarchy import Anarchy
from anarchykopfobject import AnarchyKopfObject

class AnarchyCachedKopfObject(AnarchyKopfObject):
Expand All @@ -7,7 +8,8 @@ async def get(cls, name):
if obj:
return obj
obj = await cls.fetch(name)
cls.cache[name] = obj
if not obj.ignore:
cls.cache[name] = obj
return obj

@classmethod
Expand Down
4 changes: 4 additions & 0 deletions operator/anarchykopfobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def deletion_timestamp(self):
def finalizers(self):
return self.metadata.get('finalizers', [])

@property
def ignore(self):
return Anarchy.ignore_label in self.labels

@property
def is_deleted(self):
"""
Expand Down
131 changes: 110 additions & 21 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,28 @@ async def governor_event(event, logger, **_):
await AnarchyGovernor.handle_event(event)


@kopf.on.create(Anarchy.domain, Anarchy.version, 'anarchysubjects')
@kopf.on.create(
Anarchy.domain, Anarchy.version, 'anarchysubjects',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def subject_create(**kwargs):
anarchy_subject = AnarchySubject.load(**kwargs)
async with anarchy_subject.lock:
await anarchy_subject.handle_create()

@kopf.on.delete(Anarchy.domain, Anarchy.version, 'anarchysubjects')
@kopf.on.delete(
Anarchy.domain, Anarchy.version, 'anarchysubjects',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def subject_delete(**kwargs):
anarchy_subject = AnarchySubject.load(**kwargs)
async with anarchy_subject.lock:
await anarchy_subject.handle_delete()

@kopf.on.resume(Anarchy.domain, Anarchy.version, 'anarchysubjects')
@kopf.on.resume(
Anarchy.domain, Anarchy.version, 'anarchysubjects',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def subject_resume(meta, name, **kwargs):
if 'deletionTimestamp' in meta \
and Anarchy.domain not in meta['finalizers'] \
Expand All @@ -72,14 +81,20 @@ async def subject_resume(meta, name, **kwargs):
async with anarchy_subject.lock:
await anarchy_subject.handle_resume()

@kopf.on.update(Anarchy.domain, Anarchy.version, 'anarchysubjects')
@kopf.on.update(
Anarchy.domain, Anarchy.version, 'anarchysubjects',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def subject_update(new, old, **kwargs):
anarchy_subject = AnarchySubject.load(**kwargs)
if old['spec'] != new['spec']:
async with anarchy_subject.lock:
await anarchy_subject.handle_update(previous_state=old)

@kopf.on.event(Anarchy.domain, Anarchy.version, 'anarchysubjects')
@kopf.on.event(
Anarchy.domain, Anarchy.version, 'anarchysubjects',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def subject_event(event, **_):
obj = event.get('object')
if not obj or obj.get('apiVersion') != Anarchy.api_version:
Expand Down Expand Up @@ -111,7 +126,10 @@ async def subject_event(event, **_):
logging.debug(f"Ignoring deleting AnarchySubject {name}")


@kopf.on.create(Anarchy.domain, Anarchy.version, 'anarchyactions')
@kopf.on.create(
Anarchy.domain, Anarchy.version, 'anarchyactions',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def action_create(**kwargs):
anarchy_action = AnarchyAction.load(**kwargs)
anarchy_subject = await anarchy_action.get_subject()
Expand All @@ -120,23 +138,31 @@ async def action_create(**kwargs):
f"{anarchy_action} references missing AnarchySubject {anarchy_action.subject_name}",
)
return
if anarchy_subject.ignore:
return

async with anarchy_subject.lock:
await anarchy_action.handle_create(anarchy_subject)

@kopf.on.delete(Anarchy.domain, Anarchy.version, 'anarchyactions')
@kopf.on.delete(
Anarchy.domain, Anarchy.version, 'anarchyactions',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def action_delete(**kwargs):
anarchy_action = AnarchyAction.load(**kwargs)
anarchy_action.remove_from_cache()

anarchy_subject = await anarchy_action.get_subject()
if not anarchy_subject:
if not anarchy_subject or anarchy_subject.ignore:
return

async with anarchy_subject.lock:
await anarchy_action.handle_delete(anarchy_subject)

@kopf.on.resume(Anarchy.domain, Anarchy.version, 'anarchyactions')
@kopf.on.resume(
Anarchy.domain, Anarchy.version, 'anarchyactions',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def action_resume(**kwargs):
anarchy_action = AnarchyAction.load(**kwargs)
anarchy_subject = await anarchy_action.get_subject()
Expand All @@ -145,11 +171,16 @@ async def action_resume(**kwargs):
f"{anarchy_action} references missing AnarchySubject {anarchy_action.subject_name}",
)
return
if anarchy_subject.ignore:
return

async with anarchy_subject.lock:
await anarchy_action.handle_resume(anarchy_subject)

@kopf.on.update(Anarchy.domain, Anarchy.version, 'anarchyactions')
@kopf.on.update(
Anarchy.domain, Anarchy.version, 'anarchyactions',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def action_update(**kwargs):
anarchy_action = AnarchyAction.load(**kwargs)
anarchy_subject = await anarchy_action.get_subject()
Expand All @@ -160,11 +191,17 @@ async def action_update(**kwargs):
f"{anarchy_action} references missing AnarchySubject {anarchy_action.subject_name}",
)
return
if anarchy_subject.ignore:
return

async with anarchy_subject.lock:
await anarchy_action.handle_update(anarchy_subject)

@kopf.daemon(Anarchy.domain, Anarchy.version, 'anarchyactions', cancellation_timeout=1)
@kopf.daemon(
Anarchy.domain, Anarchy.version, 'anarchyactions',
cancellation_timeout=1,
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def action_daemon(stopped, **kwargs):
anarchy_action = AnarchyAction.load(**kwargs)
anarchy_subject = await anarchy_action.get_subject()
Expand All @@ -177,31 +214,42 @@ async def action_daemon(stopped, **kwargs):
try:
while not stopped:
async with anarchy_subject.lock:
if anarchy_subject.ignore:
return
sleep_interval = await anarchy_action.manage(anarchy_subject)
if sleep_interval:
await asyncio.sleep(sleep_interval)
except asyncio.CancelledError:
pass


@kopf.on.delete(Anarchy.domain, Anarchy.version, 'anarchyruns')
@kopf.on.delete(
Anarchy.domain, Anarchy.version, 'anarchyruns',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def run_delete(**kwargs):
anarchy_run = AnarchyRun.load(**kwargs)
anarchy_run.remove_from_cache()

anarchy_subject = await anarchy_run.get_subject()
if not anarchy_subject:
if not anarchy_subject or anarchy_subject.ignore:
return

if anarchy_run.has_action:
anarchy_action = await anarchy_run.get_action()
else:
anarchy_action = None

if anarchy_action and anarchy_action.ignore:
return

async with anarchy_subject.lock:
await anarchy_run.handle_delete(anarchy_subject, anarchy_action)

@kopf.on.resume(Anarchy.domain, Anarchy.version, 'anarchyruns')
@kopf.on.resume(
Anarchy.domain, Anarchy.version, 'anarchyruns',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def run_resume(**kwargs):
anarchy_run = AnarchyRun.load(**kwargs)
anarchy_subject = await anarchy_run.get_subject()
Expand All @@ -210,6 +258,8 @@ async def run_resume(**kwargs):
f"{anarchy_run} references missing AnarchySubject {anarchy_run.subject_name}",
)
return
if anarchy_subject.ignore:
return

if anarchy_run.has_action:
anarchy_action = await anarchy_run.get_action()
Expand All @@ -221,10 +271,16 @@ async def run_resume(**kwargs):
else:
anarchy_action = None

if anarchy_action and anarchy_action.ignore:
return

async with anarchy_subject.lock:
await anarchy_run.handle_resume(anarchy_subject, anarchy_action)

@kopf.on.update(Anarchy.domain, Anarchy.version, 'anarchyruns')
@kopf.on.update(
Anarchy.domain, Anarchy.version, 'anarchyruns',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def run_update(new, old, **kwargs):
anarchy_run = AnarchyRun.load(**kwargs)
anarchy_subject = await anarchy_run.get_subject()
Expand All @@ -235,6 +291,8 @@ async def run_update(new, old, **kwargs):
f"{anarchy_run} references missing AnarchySubject {anarchy_run.subject_name}",
)
return
if anarchy_subject.ignore:
return

if anarchy_run.has_action:
anarchy_action = await anarchy_run.get_action()
Expand All @@ -248,6 +306,9 @@ async def run_update(new, old, **kwargs):
else:
anarchy_action = None

if anarchy_action and anarchy_action.ignore:
return

new_runner_state = new['metadata'].get('labels', {}).get(Anarchy.runner_label)
old_runner_state = old['metadata'].get('labels', {}).get(Anarchy.runner_label)
async with anarchy_subject.lock:
Expand All @@ -267,7 +328,11 @@ async def run_update(new, old, **kwargs):
else:
await anarchy_run.handle_running(anarchy_subject, anarchy_action)

@kopf.daemon(Anarchy.domain, Anarchy.version, 'anarchyruns', cancellation_timeout=1)
@kopf.daemon(
Anarchy.domain, Anarchy.version, 'anarchyruns',
cancellation_timeout=1,
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def run_daemon(stopped, **kwargs):
anarchy_run = AnarchyRun.load(**kwargs)
anarchy_subject = await anarchy_run.get_subject()
Expand All @@ -276,6 +341,8 @@ async def run_daemon(stopped, **kwargs):
f"{anarchy_run} references missing AnarchySubject {anarchy_run.subject_name}",
)
return
if anarchy_subject.ignore:
return

if anarchy_run.has_action:
anarchy_action = await anarchy_run.get_action()
Expand All @@ -289,6 +356,10 @@ async def run_daemon(stopped, **kwargs):

try:
while not stopped:
if anarchy_subject.ignore:
return
if anarchy_action and anarchy_action.ignore:
return
async with anarchy_subject.lock:
sleep_interval = await anarchy_run.manage(anarchy_subject, anarchy_action)
if sleep_interval:
Expand All @@ -297,31 +368,47 @@ async def run_daemon(stopped, **kwargs):
pass

if not Anarchy.running_all_in_one:
@kopf.on.create(Anarchy.domain, Anarchy.version, 'anarchyrunners')
@kopf.on.create(
Anarchy.domain, Anarchy.version, 'anarchyrunners',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def runner_create(logger, **kwargs):
anarchy_runner = AnarchyRunner.load(**kwargs)
async with anarchy_runner.lock:
await anarchy_runner.handle_create(logger=logger)

@kopf.on.delete(Anarchy.domain, Anarchy.version, 'anarchyrunners')
@kopf.on.delete(
Anarchy.domain, Anarchy.version, 'anarchyrunners',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def runner_delete(logger, **kwargs):
anarchy_runner = AnarchyRunner.load(**kwargs)
async with anarchy_runner.lock:
await anarchy_runner.handle_delete(logger=logger)

@kopf.on.resume(Anarchy.domain, Anarchy.version, 'anarchyrunners')
@kopf.on.resume(
Anarchy.domain, Anarchy.version, 'anarchyrunners',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def runner_resume(logger, **kwargs):
anarchy_runner = AnarchyRunner.load(**kwargs)
async with anarchy_runner.lock:
await anarchy_runner.handle_resume(logger=logger)

@kopf.on.update(Anarchy.domain, Anarchy.version, 'anarchyrunners')
@kopf.on.update(
Anarchy.domain, Anarchy.version, 'anarchyrunners',
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def runner_update(logger, **kwargs):
anarchy_runner = AnarchyRunner.load(**kwargs)
async with anarchy_runner.lock:
await anarchy_runner.handle_update(logger=logger)

@kopf.daemon(Anarchy.domain, Anarchy.version, 'anarchyrunners', cancellation_timeout=1)
@kopf.daemon(
Anarchy.domain, Anarchy.version, 'anarchyrunners',
cancellation_timeout=1,
labels={Anarchy.ignore_label: kopf.ABSENT},
)
async def runner_daemon(logger, stopped, **kwargs):
anarchy_runner = AnarchyRunner.load(**kwargs)
while not stopped:
Expand All @@ -347,6 +434,8 @@ async def runner_pod_event(event, logger, **_):
if not anarchy_runner:
logger.warning(f"AnarchyRunner {anarchy_runner_name} not found for Pod {pod.metadata.name}")
return
if anarchy_runner.ignore:
return
async with anarchy_runner.lock:
if event['type'] == 'DELETED':
await anarchy_runner.handle_runner_pod_deleted(pod=pod, logger=logger)
Expand Down

0 comments on commit cc17877

Please sign in to comment.