Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing idleTimeout key in daskcluster_autoshutdown #882

Open
timomaier opened this issue Apr 15, 2024 · 16 comments · Fixed by #884
Open

Missing idleTimeout key in daskcluster_autoshutdown #882

timomaier opened this issue Apr 15, 2024 · 16 comments · Fixed by #884

Comments

@timomaier
Copy link

Describe the issue:

My KubeClusters sometimes do not get shut down properly on kubernetes when they're done with their work. Kubernetes logs state that there's an exception in a kopf finalizer which is retried indefinitely, apparently due to the spec dict given to daskcluster_autoshutdown:

  Timer 'daskcluster_autoshutdown' failed with an exception. Will retry.
  Traceback (most recent call last):
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 276, in execute_handler_once
      result = await invoke_handler(
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/execution.py", line 371, in invoke_handler
      result = await invocation.invoke(
    File "/usr/local/lib/python3.10/site-packages/kopf/_core/actions/invocation.py", line 116, in invoke
      result = await fn(**kwargs)  # type: ignore
    File "/usr/local/lib/python3.10/site-packages/dask_kubernetes/operator/controller/controller.py", line 852, in daskcluster_autoshutdown
      if spec["idleTimeout"]:
    File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 297, in __getitem__
      return resolve(self._src, self._path + (item,))
    File "/usr/local/lib/python3.10/site-packages/kopf/_cogs/structs/dicts.py", line 121, in resolve
      result = result[key]
  KeyError: 'idleTimeout'

When I remove these lines from the DaskCluster resource YAML in kubernetes, the problem is gone

    finalizers:
      - kopf.zalando.org/KopfFinalizerMarker

Is it correct that daskcluster_autoshutdown as below receives spec as a specification dict, e.g. from make_cluster_spec(..., idle_timeout=5)? I tried expicitly adding the idle_timeout, but the problem persists

@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
    if spec["idleTimeout"]:
        try:
            idle_since = await check_scheduler_idle(
                scheduler_service_name=f"{name}-scheduler",
                namespace=namespace,
                logger=logger,
            )
        except Exception:
            logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
            return
        if idle_since and time.time() > idle_since + spec["idleTimeout"]:
            cluster = await DaskCluster.get(name, namespace=namespace)
            await cluster.delete()

Not sure if this is a proper bug, or an issue with kopf, or anything is misconfigured on my end. Appreciate any help.
I'd also be fine with just removing the timer/finalizer if that's possible.

Anything else we need to know?:

Environment:

  • Dask version: 2024.4.1
  • Dask operator version: 2024.4.0
  • Python version: 3.10.12
  • kopf python version: 1.37.1
  • Operating System: Linux
  • Install method (conda, pip, source): pip
@jacobtomlinson
Copy link
Member

It looks like the idle timeout option isn't making it through to the resource in Kubernetes. Could you describe the cluster resource and ensure it is set correctly? Could you also ensure you have the latest version on the operator installed?

@timomaier
Copy link
Author

This is the config yaml of the DaskCluster (I removed unnecessary parts), if this helps

apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  annotations:
    kopf.zalando.org/last-handled-configuration: >
      {"spec":} # Same spec dict as below
  creationTimestamp: '2024-04-15T15:37:26Z'
  finalizers:
    - kopf.zalando.org/KopfFinalizerMarker
  generation: 4
  managedFields:
    - apiVersion: kubernetes.dask.org/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:spec:
          .: {}
          f:scheduler:
            .: {}
            f:service:
              .: {}
              f:ports:
                .: {}
                k:{"port":8786,"protocol":"TCP"}:
                  .: {}
                  f:name: {}
                  f:port: {}
                  f:protocol: {}
                  f:targetPort: {}
                k:{"port":8787,"protocol":"TCP"}:
                  .: {}
                  f:name: {}
                  f:port: {}
                  f:protocol: {}
                  f:targetPort: {}
              f:selector:
                .: {}
                f:dask.org/cluster-name: {}
                f:dask.org/component: {}
              f:type: {}
            f:spec:
              .: {}
              f:containers: {}
              f:imagePullSecrets: {}
          f:worker:
            .: {}
            f:replicas: {}
            f:spec:
              .: {}
              f:containers: {}
              f:imagePullSecrets: {}
              f:volumes: {}
        f:status:
          f:phase: {}
      manager: kr8s
      operation: Update
      time: '2024-04-15T15:37:26Z'
    - apiVersion: kubernetes.dask.org/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:annotations:
            .: {}
            f:kopf.zalando.org/last-handled-configuration: {}
          f:finalizers:
            .: {}
            v:"kopf.zalando.org/KopfFinalizerMarker": {}
        f:status: {}
      manager: kopf
      operation: Update
      time: '2024-04-15T15:37:27Z'
  name: dask-cluster
  namespace: dask-operator
  resourceVersion: '712042645'
  uid: 3c7db72d-8f94-4904-b0c6-3e496f9b1ff6
spec:
  scheduler:
    ...
  worker:
    ...
status:
  phase: Running

The operator is running image ghcr.io/dask/dask-kubernetes-operator:2024.4.0, and a pip list inside the pod shows

Package            Version
------------------ -----------
...
dask               2024.4.0
dask-kubernetes    0+unknown
distributed        2024.4.0

I'm using dask in conjunction with prefect, and the creation of the KubeCluster is handed over to the prefect DaskTaskRunner, however the idle_timeout should be properly set via kwargs:

    spec = make_cluster_spec(
        name=f"dask-cluster-{getuser()}-{now}",
        # ...
        n_workers=n_workers,
        resources=resources,
        idle_timeout=5,
    )
    runner = DaskTaskRunner(
        cluster_class="dask_kubernetes.operator.KubeCluster",
        cluster_kwargs={
            "idle_timeout": 5,
            "custom_cluster_spec": spec,
            "namespace": "dask-operator",
        },

Not sure if this is related somehow.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Apr 17, 2024

This is strange, I don't see idleTimeout being set in your spec. But it should be being set in make_cluster_spec().

Can you confirm that idleTimeout is set in your spec variable?

@jacobtomlinson
Copy link
Member

pip list inside the pod shows

What about pip list from the machine you are running this code from?

@timomaier
Copy link
Author

What about pip list from the machine you are running this code from?

Package                    Version
-------------------------- -----------------
dask                       2024.4.1
dask-kubernetes            2024.4.0
distributed                2024.4.1

Can you confirm that idleTimeout is set in your spec variable?

I can confirm, make_cluster_spec returns a dict like this

{
  'apiVersion': 'kubernetes.dask.org/v1',
  'kind': 'DaskCluster',
  'metadata': {'name': 'dask-cluster-2024-04-17_100447'},
  'spec': {'idleTimeout': 5, 'worker': {...}, 'scheduler': {...}}
}

The same spec dict is also present in the _custom_cluster_spec attribute of the KubeCluster instance after it is created. The idleTimeout attribute is also set in KubeCluster.

So, as you pointed out, it is set correctly but not given to the resource properly. The worker and scheduler spec's look perfectly fine though, it's just the idleTimeout that gets lost.

@jacobtomlinson
Copy link
Member

Thanks for confirming. That dict gets passed straight to the create call, so there's nowhere for that key to get dropped in between. The only thing I can think is perhaps your CRDs are out of date and don't contain that property and so Kubernetes is silently dropping it. Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?

@timomaier
Copy link
Author

I did uninstall the operator and made sure everything related to it is gone , followed this guide, installed again, but unfortunately the problem persists.

The Exception is a bit annoying because it spams the logs of the kubernetes cluster, but not critical. My core issue was that resources were not deleted properly, but as a workaround I solved that by making sure to manualy delete all depoyments related to dask using kubernetes python package.

I'm open to other suggestions, otherwise if other people do not see this problem feel free to close the issue. Thanks for the quick help so far!

@jacobtomlinson
Copy link
Member

Yeah it's just strange that key is being dropped somewhere. I also feel like it may be specific to your setup because nobody else has reported it.

We could easily change spec["idleTimeout"] to spec.get("idleTimeout", 0) which would silence the log noise but not resolve the problem with things not timing out.

@tsanikgr
Copy link

tsanikgr commented Jun 10, 2024

Hello,

I think I am facing the same issue.

None of my attempts to have the cluster automatically shut down after idle_seconds seems to work:

  • setting idle_timeout in the constructor of KubeCluster
  • setting idle_timeout in the make_cluster_spec function
  • setting the distributed.scheduler.idle-timeout dask config value

I used to get the same error as the OP before updating to the latest version of dask + operator. Now, I see the following log message every 5 seconds, but the cluster never shuts down:

Timer 'daskcluster_autoshutdown' succeeded.

If I describe the DaskCluster resource on my k8s cluster, then the idleTimeout key is missing from the spec.

Also, I have verified using debug breakpoints that data["spec"]["idleTimeout"] is properly set on this line. However, in the resulting DaskCluster object, cluster.spec.get("idleTimeout") is None and cluster.raw["spec"].get("idleTimeout") is None.

So as @jacobtomlinson said, it seems like this parameter is dropped somewhere inbetween the constructor call, and the resource creation in the cluster. Uninstalling and re-installing the operator unfortunately did not fix the issue.


package versions:
dask==2024.5.2
distributed==2024.5.2
dask-kubernetes==2024.5.0

dask-kubernetes-operator-2024.5.0 helm chart with app version 2022.4.1

@slevang
Copy link

slevang commented Feb 12, 2025

I'm having the same issue, idle_timeout doesn't work and I see the same error as in the OP over and over.

@jacobtomlinson
Copy link
Member

Can you uninstall the operator and ensure the CRDs have been cleaned up, then install it again?

Can I just check that when folks are trying to resolve this that you are completely cleaning up the CRDs? Helm does not do this for you, you need to do it manually.

@slevang
Copy link

slevang commented Feb 13, 2025

Ok figured it out. I did the helm reinstall with a deletion of the CRDs, then was getting the same log as @tsanikgr but seeing idleTimeout: 0 in the spec despite having passed a different value.

I realized the issue was that I was using a custom_cluster_spec, but only passing idle_timeout to the KubeCluster constructor and not the creation of the spec. self.idle_timeout only gets used in this pathway and not the else branch. Maybe a slight improvement could be made there to avoid the confusion?

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Feb 14, 2025

@slevang ah thanks for digging into this. That makes sense. I wonder what the better approach would be:

  1. If custom_cluster_spec is set we check that none of the other kwargs are set. If they are we raise an exception saying "Either use kwargs or a custom_cluster_spec, not both".
  2. If both kwargs and custom_cluster_spec are set we attempt to merge the two

As a user which of these behaviours would you find least surprising?

@slevang
Copy link

slevang commented Feb 15, 2025

  1. is probably least ambiguous but could be a major breaking change if we just start raising in this situation. Surely others are mix and matching kwargs between these two objects. Warning that custom_cluster_spec takes precedence rather than raising would be a simple change, and agrees with the current docstring:

Path to a YAML manifest or a dictionary representation of a DaskCluster resource object which will be used to create the cluster instead of generating one from the other keyword arguments.

  1. would generally be a good UX but is maybe a little too magic.

@jacobtomlinson
Copy link
Member

Surely others are mix and matching kwargs between these two objects.

Sure, but doing so leads to this bug. I agree that an exception may be distruptive to people who are doing this. But warnings are easy to ignore. I guess it boils down to how problematic we think this issue is. Does setting both things lead to unexpected results from your Dask cluster?

In this situation I think it could lead to unexpect costs if users are relying on idle_timeout and it's not being set. This seems serious enough that maybe it should be an exception.

@jacobtomlinson
Copy link
Member

So just to write up next steps in case anyone wants to pick this up:

In the KubeCluster object the custom_cluster_spec and most other kwargs are mutually exclusive. If you set custom_cluster_spec it will ignore other kwargs.

if not self._custom_cluster_spec:
self._log("Generating cluster spec")
data = make_cluster_spec(
name=self.name,
env=self.env,
resources=self.resources,
worker_command=self.worker_command,
n_workers=self.n_workers,
image=self.image,
scheduler_service_type=self.scheduler_service_type,
idle_timeout=self.idle_timeout,
jupyter=self.jupyter,
)
else:
data = self._custom_cluster_spec

We should add a check to the __init__ of KubeCluster that raises an exception if both custom_cluster_spec and any of the kwargs that get passed to make_cluster_spec() are set. The exception should instruct folks to set these options when they create the custom spec.

def __init__(
self,
*,
name: Optional[str] = None,
namespace: Optional[str] = None,
image: Optional[str] = None,
n_workers: Optional[int] = None,
resources: Optional[Dict[str, str]] = None,
env: Optional[List[dict] | Dict[str, str]] = None,
worker_command: Optional[List[str]] = None,
port_forward_cluster_ip: Optional[bool] = None,
create_mode: Optional[CreateMode] = None,
shutdown_on_close: Optional[bool] = None,
idle_timeout: Optional[int] = None,
resource_timeout: Optional[int] = None,
scheduler_service_type: Optional[str] = None,
custom_cluster_spec: Optional[str | dict] = None,
scheduler_forward_port: Optional[int] = None,
jupyter: bool = False,
loop: Optional[IOLoop] = None,
asynchronous: bool = False,
quiet: bool = False,
**kwargs,
):

We also need a test that confirms the exception is raised in cases like custom_cluster_spec and idle_timeout are passed to KubeCluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants