diff --git a/docker/worker_entrypoint.sh b/docker/worker_entrypoint.sh index 056a20b17..9e78003c1 100755 --- a/docker/worker_entrypoint.sh +++ b/docker/worker_entrypoint.sh @@ -41,11 +41,17 @@ if [[ -n "${WORKER_PROCESSES:-}" ]]; then log "Worker starting ${WORKER_PROCESSES} processes" fi +if [[ -n "${WORKER_THREADS:-}" ]]; then + processes="--threads ${WORKER_THREADS}" + log "Worker starting with ${WORKER_THREADS} threads per process" +fi + while : ; do /usr/local/bin/dramatiq \ ${verbose:-} \ ${watch:-} \ ${processes:-} \ + ${threads:-} \ lifemonitor.tasks.worker:broker lifemonitor.tasks.tasks exit_code=$? if [[ $exit_code == 3 ]]; then diff --git a/k8s/Chart.yaml b/k8s/Chart.yaml index 208c09375..1d3baf92c 100644 --- a/k8s/Chart.yaml +++ b/k8s/Chart.yaml @@ -7,12 +7,12 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.3.0 +version: 0.4.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: 0.1.0 +appVersion: 0.4.0 # Chart dependencies dependencies: diff --git a/k8s/templates/_helpers.tpl b/k8s/templates/_helpers.tpl index ebeb4ae3e..40f949c38 100644 --- a/k8s/templates/_helpers.tpl +++ b/k8s/templates/_helpers.tpl @@ -35,9 +35,9 @@ Common labels */}} {{- define "chart.labels" -}} app.kubernetes.io/name: {{ include "chart.name" . }} -helm.sh/chart: {{ include "chart.chart" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} +helm.sh/chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" {{- end }} {{/* @@ -48,6 +48,19 @@ app.kubernetes.io/name: {{ include "chart.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} {{- end }} +{{/* + +Define lifemonitor image +*/}} +{{- define "chart.lifemonitor.image" -}} +{{- if .Values.lifemonitor.image }} +{{- printf "%s" .Values.lifemonitor.image }} +{{- else }} +{{- printf "crs4/lifemonitor:%s" .Chart.AppVersion }} +{{- end }} +{{- end }} + + {{/* Create the name of the service account to use */}} @@ -77,6 +90,10 @@ Define environment variables shared by some pods. value: "{{ .Values.postgresql.postgresqlPassword }}" - name: POSTGRESQL_DATABASE value: "{{ .Values.postgresql.postgresqlDatabase }}" +- name: REDIS_HOST + value: "{{ .Release.Name }}-redis-master" +- name: WORKER_PROCESSES + value: "{{ .Values.worker.processes }}" - name: LIFEMONITOR_TLS_KEY value: "/lm/certs/tls.key" - name: LIFEMONITOR_TLS_CERT diff --git a/k8s/templates/backend-deployment.yaml b/k8s/templates/backend-deployment.yaml index 7ee59cd8f..3c9ec93df 100644 --- a/k8s/templates/backend-deployment.yaml +++ b/k8s/templates/backend-deployment.yaml @@ -4,6 +4,7 @@ metadata: name: {{ include "chart.fullname" . }}-backend labels: {{- include "chart.labels" . | nindent 4 }} + app.kubernetes.io/component: backend spec: {{- if not .Values.lifemonitor.autoscaling.enabled }} replicas: {{ .Values.lifemonitor.replicaCount }} @@ -11,14 +12,17 @@ spec: selector: matchLabels: {{- include "chart.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: backend template: metadata: - {{- with .Values.lifemonitor.podAnnotations }} annotations: + checksum/settings: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }} + {{- with .Values.lifemonitor.podAnnotations }} {{- toYaml . | nindent 8 }} - {{- end }} + {{- end }} labels: {{- include "chart.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: backend spec: {{- with .Values.lifemonitor.imagePullSecrets }} imagePullSecrets: @@ -36,12 +40,12 @@ spec: - name: app securityContext: {{- toYaml .Values.lifemonitor.securityContext | nindent 12 }} - image: {{ .Values.lifemonitor.image }} + image: {{ include "chart.lifemonitor.image" . }} imagePullPolicy: {{ .Values.lifemonitor.imagePullPolicy }} env: -{{ include "lifemonitor.common-env" . | indent 12 }} + {{- include "lifemonitor.common-env" . | nindent 12 }} volumeMounts: -{{ include "lifemonitor.common-volume-mounts" . | indent 12 }} + {{- include "lifemonitor.common-volume-mounts" . | nindent 12 }} ports: - name: http containerPort: 8000 @@ -61,7 +65,7 @@ spec: resources: {{- toYaml .Values.lifemonitor.resources | nindent 12 }} volumes: -{{ include "lifemonitor.common-volume" . | indent 12 }} + {{- include "lifemonitor.common-volume" . | nindent 8 }} {{- with .Values.lifemonitor.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/k8s/templates/job-init.yaml b/k8s/templates/job-init.yaml new file mode 100644 index 000000000..a20595777 --- /dev/null +++ b/k8s/templates/job-init.yaml @@ -0,0 +1,35 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ include "chart.fullname" . }}-init + labels: + {{- include "chart.labels" . | nindent 4 }} +spec: + template: + spec: + containers: + - name: lifemonitor-init + image: {{ include "chart.lifemonitor.image" . }} + imagePullPolicy: {{ .Values.lifemonitor.imagePullPolicy }} + command: ["/bin/sh","-c"] + args: ["wait-for-postgres.sh && flask init db && flask task-queue reset"] + env: + {{- include "lifemonitor.common-env" . | nindent 10 }} + volumeMounts: + {{- include "lifemonitor.common-volume-mounts" . | nindent 10 }} + restartPolicy: OnFailure + volumes: + {{- include "lifemonitor.common-volume" . | nindent 8 }} + {{- with .Values.lifemonitor.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.lifemonitor.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.lifemonitor.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + backoffLimit: 4 diff --git a/k8s/templates/init-job.yaml b/k8s/templates/job-upgrade.yaml similarity index 62% rename from k8s/templates/init-job.yaml rename to k8s/templates/job-upgrade.yaml index 243a1c8a6..34008f66f 100644 --- a/k8s/templates/init-job.yaml +++ b/k8s/templates/job-upgrade.yaml @@ -1,18 +1,24 @@ apiVersion: batch/v1 kind: Job metadata: - name: {{ include "chart.fullname" . }}-init + name: {{ include "chart.fullname" . }}-upgrade labels: {{- include "chart.labels" . | nindent 4 }} + annotations: + # This is what defines this resource as a hook. Without this line, the + # job is considered part of the release. + "helm.sh/hook": post-upgrade + "helm.sh/hook-weight": "-5" + "helm.sh/hook-delete-policy": hook-succeeded spec: - template: + template: spec: containers: - - name: lifemonitor-init - image: "{{ .Values.lifemonitor.image }}" + - name: lifemonitor-upgrade + image: {{ include "chart.lifemonitor.image" . }} imagePullPolicy: {{ .Values.lifemonitor.imagePullPolicy }} command: ["/bin/sh","-c"] - args: ["wait-for-postgres.sh && flask init db"] + args: ["wait-for-postgres.sh && flask init db && flask task-queue reset"] env: {{ include "lifemonitor.common-env" . | indent 10 }} volumeMounts: @@ -32,4 +38,4 @@ spec: tolerations: {{- toYaml . | nindent 8 }} {{- end }} - backoffLimit: 4 \ No newline at end of file + backoffLimit: 4 diff --git a/k8s/templates/secret.yaml b/k8s/templates/secret.yaml index 936cfc7e0..73af94f14 100644 --- a/k8s/templates/secret.yaml +++ b/k8s/templates/secret.yaml @@ -37,7 +37,10 @@ stringData: POSTGRESQL_PASSWORD={{ .Values.postgresql.postgresqlPassword }} # Dramatiq worker settings - WORKER_PROCESSES={{ .Values.lifemonitor.worker_processes }} + WORKER_PROCESSES={{ .Values.worker.processes }} + {{- if .Values.worker.threads }} + WORKER_THREADS={{ .Values.worker.threads }} + {{- end }} # Redis settings REDIS_HOST={{ .Release.Name }}-redis-master @@ -45,11 +48,14 @@ stringData: REDIS_PASSWORD={{ .Values.redis.auth.password }} # Redis Cache - CACHE_REDIS_DB=0 - CACHE_REDIS_URL=redis://:{{ .Values.redis.auth.password }}@{{ .Release.Name }}-redis-master:{{ .Values.redis.master.service.port }}/0 - CACHE_DEFAULT_TIMEOUT=300 - CACHE_SESSION_TIMEOUT=3600 - CACHE_BUILDS_TIMEOUT=84600 + CACHE_REDIS_HOST={{ .Release.Name }}-redis-master + CACHE_REDIS_DB={{ .Values.cache.db }} + CACHE_REDIS_URL=redis://:{{ .Values.redis.auth.password }}@{{ .Release.Name }}-redis-master:{{ .Values.redis.master.service.port }}/{{ .Values.cache.db }} + CACHE_DEFAULT_TIMEOUT={{ .Values.cache.timeout.default }} + CACHE_REQUEST_TIMEOUT={{ .Values.cache.timeout.request }} + CACHE_SESSION_TIMEOUT={{ .Values.cache.timeout.session }} + CACHE_WORKFLOW_TIMEOUT={{ .Values.cache.timeout.workflow }} + CACHE_BUILD_TIMEOUT={{ .Values.cache.timeout.build }} # Set admin credentials LIFEMONITOR_ADMIN_PASSWORD={{ .Values.lifemonitor.administrator.password }} diff --git a/k8s/templates/service.yaml b/k8s/templates/service.yaml index 90cb29f12..ac8c8ccf6 100644 --- a/k8s/templates/service.yaml +++ b/k8s/templates/service.yaml @@ -13,3 +13,4 @@ spec: name: http selector: {{- include "chart.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: backend diff --git a/k8s/templates/worker-deployment.yaml b/k8s/templates/worker-deployment.yaml index cede78b95..ce99c4c61 100644 --- a/k8s/templates/worker-deployment.yaml +++ b/k8s/templates/worker-deployment.yaml @@ -5,70 +5,71 @@ metadata: labels: {{- include "chart.labels" . | nindent 4 }} spec: - {{- if not .Values.lifemonitor.autoscaling.enabled }} - replicas: {{ .Values.lifemonitor.replicaCount }} + {{- if not .Values.worker.autoscaling.enabled }} + replicas: {{ .Values.worker.replicaCount }} {{- end }} selector: matchLabels: {{- include "chart.selectorLabels" . | nindent 6 }} template: metadata: - {{- with .Values.lifemonitor.podAnnotations }} annotations: + checksum/settings: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }} + {{- with .Values.worker.podAnnotations }} {{- toYaml . | nindent 8 }} {{- end }} labels: {{- include "chart.selectorLabels" . | nindent 8 }} spec: - {{- with .Values.lifemonitor.imagePullSecrets }} + {{- with .Values.worker.imagePullSecrets }} imagePullSecrets: {{- toYaml . | nindent 8 }} {{- end }} serviceAccountName: {{ include "chart.serviceAccountName" . }} securityContext: - {{- toYaml .Values.lifemonitor.podSecurityContext | nindent 8 }} + {{- toYaml .Values.worker.podSecurityContext | nindent 8 }} initContainers: - name: init image: "crs4/k8s-wait-for:latest" imagePullPolicy: IfNotPresent args: ["job", "{{ include "chart.fullname" . }}-init"] containers: - - name: app + - name: worker securityContext: - {{- toYaml .Values.lifemonitor.securityContext | nindent 12 }} - image: {{ .Values.lifemonitor.image }} - imagePullPolicy: {{ .Values.lifemonitor.imagePullPolicy }} + {{- toYaml .Values.worker.securityContext | nindent 12 }} + image: {{ include "chart.lifemonitor.image" . }} + imagePullPolicy: {{ .Values.worker.imagePullPolicy }} command: ["/bin/sh","-c"] args: ["/usr/local/bin/worker_entrypoint.sh"] env: -{{ include "lifemonitor.common-env" . | indent 12 }} + {{- include "lifemonitor.common-env" . | nindent 12 }} volumeMounts: -{{ include "lifemonitor.common-volume-mounts" . | indent 12 }} - livenessProbe: - httpGet: - scheme: HTTPS - path: /health - port: 8000 - readinessProbe: - httpGet: - scheme: HTTPS - path: /health - port: 8000 - initialDelaySeconds: 5 - periodSeconds: 3 + {{- include "lifemonitor.common-volume-mounts" . | nindent 12 }} + # livenessProbe: + # httpGet: + # scheme: HTTPS + # path: /health + # port: 8000 + # readinessProbe: + # httpGet: + # scheme: HTTPS + # path: /health + # port: 8000 + # initialDelaySeconds: 5 + # periodSeconds: 3 resources: - {{- toYaml .Values.lifemonitor.resources | nindent 12 }} + {{- toYaml .Values.worker.resources | nindent 12 }} volumes: -{{ include "lifemonitor.common-volume" . | indent 12 }} - {{- with .Values.lifemonitor.nodeSelector }} + {{- include "lifemonitor.common-volume" . | nindent 8 }} + {{- with .Values.worker.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} - {{- with .Values.lifemonitor.affinity }} + {{- with .Values.worker.affinity }} affinity: {{- toYaml . | nindent 8 }} {{- end }} - {{- with .Values.lifemonitor.tolerations }} + {{- with .Values.worker.tolerations }} tolerations: {{- toYaml . | nindent 8 }} {{- end }} diff --git a/k8s/values.yaml b/k8s/values.yaml index e0c9fef4c..07683f742 100644 --- a/k8s/values.yaml +++ b/k8s/values.yaml @@ -59,10 +59,19 @@ testing_services: # token: # type: travis +cache: + db: 0 + timeout: + default: 30 + request: 15 + session: 3600 + workflow: 1800 + build: 84600 + lifemonitor: replicaCount: 1 - image: &lifemonitorImage crs4/lifemonitor:master + image: &lifemonitorImage crs4/lifemonitor:0.4.0 imagePullPolicy: &lifemonitorImagePullPolicy Always imagePullSecrets: [] @@ -96,16 +105,65 @@ lifemonitor: type: ClusterIP port: 8000 - # Dramatiq worker settings - worker_processes: 1 - persistence: storageClass: *storageClass # Enable/Disable the pod to test connection to the LifeMonitor back-end enableTestConnection: false - resources: {} + resources: + {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 100 + targetCPUUtilizationPercentage: 80 + # targetMemoryUtilizationPercentage: 80 + + nodeSelector: {} + + tolerations: [] + + affinity: {} + +worker: + image: *lifemonitorImage + imagePullPolicy: *lifemonitorImagePullPolicy + imagePullSecrets: [] + + processes: 1 + #threads: 1 + + podAnnotations: {} + + podSecurityContext: + {} + # fsGroup: 2000 + + securityContext: + {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + + replicaCount: 1 + + resources: + {} # We usually recommend not to specify default resources and to leave this as a conscious # choice for the user. This also increases chances charts run on environments with little # resources, such as Minikube. If you do want to specify resources, uncomment the following diff --git a/lifemonitor/api/controllers.py b/lifemonitor/api/controllers.py index 6417f9fd1..75e3dfcb2 100644 --- a/lifemonitor/api/controllers.py +++ b/lifemonitor/api/controllers.py @@ -48,16 +48,14 @@ def _row_to_dict(row): return d -# @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def workflow_registries_get(): registries = lm.get_workflow_registries() logger.debug("registries_get. Got %s registries", len(registries)) return serializers.ListOfWorkflowRegistriesSchema().dump(registries) -# @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def workflow_registries_get_by_uuid(registry_uuid): registry = lm.get_workflow_registry_by_uuid(registry_uuid) logger.debug("registries_get. Got %s registry", registry) @@ -65,7 +63,7 @@ def workflow_registries_get_by_uuid(registry_uuid): @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def workflow_registries_get_current(): if current_registry: registry = current_registry @@ -74,7 +72,7 @@ def workflow_registries_get_current(): return lm_exceptions.report_problem(401, "Unauthorized") -@cached(timeout=Timeout.SESSION) +@cached(timeout=Timeout.REQUEST) def workflows_get(status=False): workflows = lm.get_public_workflows() if current_user and not current_user.is_anonymous: @@ -114,7 +112,7 @@ def _get_workflow_or_problem(wf_uuid, wf_version=None): detail=messages.unauthorized_workflow_access.format(wf_uuid)) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_get_by_id(wf_uuid, wf_version): response = _get_workflow_or_problem(wf_uuid, wf_version) return response if isinstance(response, Response) \ @@ -123,7 +121,7 @@ def workflows_get_by_id(wf_uuid, wf_version): else None).dump(response) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_get_latest_version_by_id(wf_uuid, previous_versions=False, ro_crate=False): response = _get_workflow_or_problem(wf_uuid, None) exclude = ['previous_versions'] if not previous_versions else [] @@ -134,14 +132,14 @@ def workflows_get_latest_version_by_id(wf_uuid, previous_versions=False, ro_crat subscriptionsOf=[current_user] if not current_user.is_anonymous else None).dump(response) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_get_versions_by_id(wf_uuid): response = _get_workflow_or_problem(wf_uuid, None) return response if isinstance(response, Response) \ else serializers.ListOfWorkflowVersions().dump(response.workflow) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_get_status(wf_uuid): wf_version = request.args.get('version', 'latest').lower() response = _get_workflow_or_problem(wf_uuid, wf_version) @@ -149,7 +147,7 @@ def workflows_get_status(wf_uuid): else serializers.WorkflowStatusSchema().dump(response) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_rocrate_metadata(wf_uuid, wf_version): response = _get_workflow_or_problem(wf_uuid, wf_version) if isinstance(response, Response): @@ -157,7 +155,7 @@ def workflows_rocrate_metadata(wf_uuid, wf_version): return response.crate_metadata -@cached() +@cached(timeout=Timeout.WORKFLOW, client_scope=False) def workflows_rocrate_download(wf_uuid, wf_version): response = _get_workflow_or_problem(wf_uuid, wf_version) if isinstance(response, Response): @@ -177,7 +175,7 @@ def workflows_rocrate_download(wf_uuid, wf_version): @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def registry_workflows_get(status=False): workflows = lm.get_registry_workflows(current_registry) logger.debug("workflows_get. Got %s workflows (registry: %s)", len(workflows), current_registry) @@ -188,12 +186,12 @@ def registry_workflows_get(status=False): def registry_workflows_post(body): if not current_registry: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_registry_found) - clear_cache(registry_workflows_get) + clear_cache() return workflows_post(body) @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def registry_user_workflows_get(user_id, status=False): if not current_registry: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_registry_found) @@ -212,12 +210,12 @@ def registry_user_workflows_get(user_id, status=False): def registry_user_workflows_post(user_id, body): if not current_registry: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_registry_found) - clear_cache(registry_user_workflows_get, user_id) + clear_cache() return workflows_post(body, _submitter_id=user_id) @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def user_workflows_get(status=False, subscriptions=False): if not current_user or current_user.is_anonymous: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_user_in_session) @@ -232,7 +230,7 @@ def user_workflows_get(status=False, subscriptions=False): def user_workflows_post(body): if not current_user or current_user.is_anonymous: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_user_in_session) - clear_cache(user_workflows_get) + clear_cache() return workflows_post(body) @@ -243,8 +241,7 @@ def user_workflow_subscribe(wf_uuid): return response subscription = lm.subscribe_user_resource(current_user, response.workflow) logger.debug("Created new subscription: %r", subscription) - clear_cache(user_workflows_get) - clear_cache(workflows_get_latest_version_by_id) + clear_cache() return auth_serializers.SubscriptionSchema(exclude=('meta', 'links')).dump(subscription), 201 @@ -255,13 +252,12 @@ def user_workflow_unsubscribe(wf_uuid): return response subscription = lm.unsubscribe_user_resource(current_user, response.workflow) logger.debug("Delete subscription: %r", subscription) - clear_cache(user_workflows_get) - clear_cache(workflows_get_latest_version_by_id) + clear_cache() return connexion.NoContent, 204 @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def user_registry_workflows_get(registry_uuid, status=False): if not current_user or current_user.is_anonymous: return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_user_in_session) @@ -282,7 +278,7 @@ def user_registry_workflows_post(registry_uuid, body): return lm_exceptions.report_problem(401, "Unauthorized", detail=messages.no_user_in_session) try: registry = lm.get_workflow_registry_by_uuid(registry_uuid) - clear_cache(user_registry_workflows_get, registry_uuid) + clear_cache() return workflows_post(body, _registry=registry) except lm_exceptions.EntityNotFoundException: return lm_exceptions.report_problem(404, "Not Found", @@ -344,7 +340,7 @@ def workflows_post(body, _registry=None, _submitter_id=None): public=body.get('public', False) ) logger.debug("workflows_post. Created workflow '%s' (ver.%s)", w.uuid, w.version) - clear_cache(workflows_get) + clear_cache() return {'uuid': str(w.workflow.uuid), 'wf_version': w.version, 'name': w.name}, 201 except KeyError as e: return lm_exceptions.report_problem(400, "Bad Request", extra_info={"exception": str(e)}, @@ -381,12 +377,7 @@ def workflows_put(wf_uuid, body): wv.workflow.name = body.get('name', wv.workflow.name) wv.workflow.public = body.get('public', wv.workflow.public) wv.workflow.save() - clear_cache(workflows_get) - clear_cache(workflows_get_by_id) - clear_cache(workflows_get_latest_version_by_id) - clear_cache(workflows_get_versions_by_id) - clear_cache(workflows_get_status) - clear_cache(registry_workflows_get) + clear_cache() return connexion.NoContent, 204 @@ -399,12 +390,7 @@ def workflows_version_put(wf_uuid, wf_version, body): wv.name = body.get('name', wv.name) wv.version = body.get('version', wv.version) wv.save() - clear_cache(workflows_get) - clear_cache(workflows_get_by_id) - clear_cache(workflows_get_latest_version_by_id) - clear_cache(workflows_get_versions_by_id) - clear_cache(workflows_get_status) - clear_cache(registry_workflows_get) + clear_cache() return connexion.NoContent, 204 @@ -418,7 +404,7 @@ def workflows_delete(wf_uuid, wf_version): else: return lm_exceptions.report_problem(403, "Forbidden", detail=messages.no_user_in_session) - clear_cache(workflows_get) + clear_cache() return connexion.NoContent, 204 except OAuthIdentityNotFoundException as e: return lm_exceptions.report_problem(401, "Unauthorized", extra_info={"exception": str(e)}) @@ -431,7 +417,7 @@ def workflows_delete(wf_uuid, wf_version): raise lm_exceptions.LifeMonitorException(title="Internal Error", detail=str(e)) -@cached() +@cached(timeout=Timeout.REQUEST) def workflows_get_suites(wf_uuid, version='latest'): response = _get_workflow_or_problem(wf_uuid, version) return response if isinstance(response, Response) \ @@ -466,29 +452,21 @@ def _get_suite_or_problem(suite_uuid): return lm_exceptions.report_problem(404, "Not Found", detail=messages.suite_not_found.format(suite_uuid)) -@cached() +@cached(timeout=Timeout.REQUEST) def suites_get_by_uuid(suite_uuid): response = _get_suite_or_problem(suite_uuid) return response if isinstance(response, Response) \ else serializers.SuiteSchema().dump(response) -@cached() +@cached(timeout=Timeout.REQUEST) def suites_get_status(suite_uuid): - try: - response = _get_suite_or_problem(suite_uuid) - return response if isinstance(response, Response) \ - else serializers.SuiteStatusSchema().dump(response.status) - except lm_exceptions.RateLimitExceededException as e: - logger.debug(e) - return { - "status": "not_available", - "latest_build": [], - "reason": str(e) - } + response = _get_suite_or_problem(suite_uuid) + return response if isinstance(response, Response) \ + else serializers.SuiteStatusSchema().dump(response.status) -@cached() +@cached(timeout=Timeout.REQUEST) def suites_get_instances(suite_uuid): response = _get_suite_or_problem(suite_uuid) return response if isinstance(response, Response) \ @@ -546,8 +524,8 @@ def suites_post_instance(suite_uuid): data['service']['type'], data['service']['url'], data['resource']) - clear_cache(suites_get_instances, suite_uuid) - return {'test_instance_uuid': str(test_instance.uuid)}, 201 + clear_cache() + return {'uuid': str(test_instance.uuid)}, 201 except KeyError as e: return lm_exceptions.report_problem(400, "Bad Request", extra_info={"exception": str(e)}, detail=messages.input_data_missing) @@ -582,7 +560,7 @@ def _get_instances_or_problem(instance_uuid): detail=messages.instance_not_found.format(instance_uuid)) -@cached() +@cached(timeout=Timeout.REQUEST) def instances_get_by_id(instance_uuid): response = _get_instances_or_problem(instance_uuid) return response if isinstance(response, Response) \ @@ -596,7 +574,7 @@ def instances_delete_by_id(instance_uuid): if isinstance(response, Response): return response lm.deregister_test_instance(response) - clear_cache(suites_get_instances, instance_uuid) + clear_cache() return connexion.NoContent, 204 except OAuthIdentityNotFoundException as e: return lm_exceptions.report_problem(401, "Unauthorized", extra_info={"exception": str(e)}) @@ -609,7 +587,7 @@ def instances_delete_by_id(instance_uuid): raise lm_exceptions.LifeMonitorException(title="Internal Error", detail=str(e)) -@cached() +@cached(timeout=Timeout.REQUEST) def instances_get_builds(instance_uuid, limit): response = _get_instances_or_problem(instance_uuid) logger.info("Number of builds to load: %r", limit) @@ -617,7 +595,7 @@ def instances_get_builds(instance_uuid, limit): else serializers.ListOfTestBuildsSchema().dump(response.get_test_builds(limit=limit)) -@cached() +@cached(timeout=Timeout.REQUEST) def instances_builds_get_by_id(instance_uuid, build_id): response = _get_instances_or_problem(instance_uuid) if isinstance(response, Response): @@ -635,6 +613,8 @@ def instances_builds_get_by_id(instance_uuid, build_id): return lm_exceptions\ .report_problem(404, "Not Found", detail=messages.instance_build_not_found.format(build_id, instance_uuid)) + except lm_exceptions.RateLimitExceededException as e: + return lm_exceptions.report_problem(403, e.title, detail=e.detail) except Exception as e: return lm_exceptions.report_problem(500, "Internal Error", extra_info={"exception": str(e)}) @@ -660,6 +640,8 @@ def instances_builds_get_logs(instance_uuid, build_id, offset_bytes=0, limit_byt return lm_exceptions\ .report_problem(404, "Not Found", detail=messages.instance_build_not_found.format(build_id, instance_uuid)) + except lm_exceptions.RateLimitExceededException as e: + return lm_exceptions.report_problem(403, e.title, detail=e.detail) except ValueError as e: return lm_exceptions.report_problem(400, "Bad Request", detail=str(e)) except Exception as e: diff --git a/lifemonitor/api/models/rocrate.py b/lifemonitor/api/models/rocrate.py index e7d2e2563..e166a6b81 100644 --- a/lifemonitor/api/models/rocrate.py +++ b/lifemonitor/api/models/rocrate.py @@ -83,6 +83,10 @@ def get_roc_suite(self, roc_suite_identifier): def dataset_name(self): return self._roc_helper.name + @property + def main_entity_name(self): + return self._roc_helper.mainEntity['name'] + @property def _roc_helper(self): if not self.__roc_helper: diff --git a/lifemonitor/api/models/services/github.py b/lifemonitor/api/models/services/github.py index 259885b4a..481a2002c 100644 --- a/lifemonitor/api/models/services/github.py +++ b/lifemonitor/api/models/services/github.py @@ -23,23 +23,23 @@ import itertools as it import logging import re -from typing import Generator, Optional, Tuple +from typing import Generator, List, Optional, Tuple from urllib.error import URLError from urllib.parse import urlparse import lifemonitor.api.models as models import lifemonitor.exceptions as lm_exceptions -from lifemonitor.cache import Timeout, cache +from lifemonitor.cache import Timeout, cached import github -from github import Github, GithubException +from github import Github, GithubException, Workflow from github import \ RateLimitExceededException as GithubRateLimitExceededException from .service import TestingService # set module level logger -logger = logging.getLogger() +logger = logging.getLogger(__name__) class GithubTestingService(TestingService): @@ -104,10 +104,14 @@ def _gh_service(self) -> Github: self.initialize() return self._gh_obj - @cache.memoize(timeout=Timeout.BUILDS) + def _get_workflow_info(self, resource): + return self._parse_workflow_url(resource) + + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) def _get_repo(self, test_instance: models.TestInstance): - _, repo_full_name, _ = self._parse_workflow_url(test_instance.resource) - repository = self._gh_obj.get_repo(repo_full_name) + logger.debug("Getting github repository from remote service...") + _, repo_full_name, _ = self._get_workflow_info(test_instance.resource) + repository = self._gh_service.get_repo(repo_full_name) logger.debug("Repo ID: %s", repository.id) logger.debug("Repo full name: %s", repository.full_name) logger.debug("Repo URL: %s", f'https://github.com/{repository.full_name}') @@ -134,13 +138,24 @@ def check_connection(self) -> bool: logger.info("Caught exception from Github GET /rate_limit: %s. Connection not working?", e) return False + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) + def _get_gh_workflow(self, repository, workflow_id): + logger.debug("Getting github workflow...") + return self._gh_service.get_repo(repository).get_workflow(workflow_id) + + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) + def _get_gh_workflow_runs(self, workflow: Workflow.Workflow) -> List: + return list(workflow.get_runs()) + def _iter_runs(self, test_instance: models.TestInstance, status: str = None) -> Generator[github.WorkflowRun.WorkflowRun]: - _, repository, workflow_id = self._parse_workflow_url(test_instance.resource) + _, repository, workflow_id = self._get_workflow_info(test_instance.resource) logger.debug("iterating over runs -- wf id: %s; repository: %s; status: %s", workflow_id, repository, status) - workflow = self._gh_service.get_repo(repository).get_workflow(workflow_id) + workflow = self._get_gh_workflow(repository, workflow_id) logger.debug("Retrieved workflow %s from github", workflow_id) - for run in workflow.get_runs(): + + for run in self._get_gh_workflow_runs(workflow): + logger.debug("Loading Github run ID %r", run.id) # The Workflow.get_runs method in the PyGithub API has a status argument # which in theory we could use to filter the runs that are retrieved to # only the ones with the status that interests us. This worked in the past, @@ -152,20 +167,19 @@ def _iter_runs(self, test_instance: models.TestInstance, status: str = None) -> if status is None or run.status == status: yield run - def get_instance_external_link(self, test_instance: models.TestInstance) -> str: - _, repo_full_name, workflow_id = self._parse_workflow_url(test_instance.resource) - return f'https://github.com/{repo_full_name}/actions/workflows/{workflow_id}' - def get_last_test_build(self, test_instance: models.TestInstance) -> Optional[GithubTestBuild]: try: + logger.debug("Getting latest build...") for run in self._iter_runs(test_instance, status=self.GithubStatus.COMPLETED): return GithubTestBuild(self, test_instance, run) + logger.debug("Getting latest build... DONE") return None except GithubRateLimitExceededException as e: raise lm_exceptions.RateLimitExceededException(detail=str(e), instance=test_instance) def get_last_passed_test_build(self, test_instance: models.TestInstance) -> Optional[GithubTestBuild]: try: + logger.debug("Getting last passed build...") for run in self._iter_runs(test_instance, status=self.GithubStatus.COMPLETED): if run.conclusion == self.GithubConclusion.SUCCESS: return GithubTestBuild(self, test_instance, run) @@ -175,6 +189,7 @@ def get_last_passed_test_build(self, test_instance: models.TestInstance) -> Opti def get_last_failed_test_build(self, test_instance: models.TestInstance) -> Optional[GithubTestBuild]: try: + logger.debug("Getting last failed build...") for run in self._iter_runs(test_instance, status=self.GithubStatus.COMPLETED): if run.conclusion == self.GithubConclusion.FAILURE: return GithubTestBuild(self, test_instance, run) @@ -184,6 +199,7 @@ def get_last_failed_test_build(self, test_instance: models.TestInstance) -> Opti def get_test_builds(self, test_instance: models.TestInstance, limit=10) -> list: try: + logger.debug("Getting test builds...") return list(GithubTestBuild(self, test_instance, run) for run in it.islice(self._iter_runs(test_instance), limit)) except GithubRateLimitExceededException as e: @@ -208,10 +224,17 @@ def get_test_build(self, test_instance: models.TestInstance, build_number: int) except GithubRateLimitExceededException as e: raise lm_exceptions.RateLimitExceededException(detail=str(e), instance=test_instance) + def get_instance_external_link(self, test_instance: models.TestInstance) -> str: + _, repo_full_name, workflow_id = self._get_workflow_info(test_instance.resource) + return f'https://github.com/{repo_full_name}/actions/workflows/{workflow_id}' + def get_test_build_external_link(self, test_build: models.TestBuild) -> str: - repo = test_build.test_instance.testing_service._get_repo(test_build.test_instance) + repo = self._get_repo(test_build.test_instance) return f'https://github.com/{repo.full_name}/actions/runs/{test_build.id}' + def get_test_build_output(self, test_instance: models.TestInstance, build_number, offset_bytes=0, limit_bytes=131072): + raise lm_exceptions.NotImplementedException(detail="not supported for GitHub test builds") + @classmethod def _parse_workflow_url(cls, resource: str) -> Tuple[str, str, str]: """ @@ -316,7 +339,3 @@ def timestamp(self) -> int: @property def url(self) -> str: return self._metadata.url - - @property - def external_link(self) -> str: - return self.testing_service.get_test_build_external_link(self) diff --git a/lifemonitor/api/models/services/jenkins.py b/lifemonitor/api/models/services/jenkins.py index 64648733b..842080c77 100644 --- a/lifemonitor/api/models/services/jenkins.py +++ b/lifemonitor/api/models/services/jenkins.py @@ -27,7 +27,7 @@ import lifemonitor.api.models as models import lifemonitor.exceptions as lm_exceptions -from lifemonitor.cache import Timeout, cache + from lifemonitor.lang import messages import jenkins @@ -79,32 +79,27 @@ def get_job_name(resource): f"Unable to get the Jenkins job from the resource {job_name}") return job_name - @cache.memoize() def get_instance_external_link(self, test_instance: models.TestInstance) -> str: return self.get_project_metadata(test_instance)['url'] - @cache.memoize() def get_last_test_build(self, test_instance: models.TestInstance) -> Optional[JenkinsTestBuild]: metadata = self.get_project_metadata(test_instance) if 'lastBuild' in metadata and metadata['lastBuild']: return self.get_test_build(test_instance, metadata['lastBuild']['number']) return None - @cache.memoize() def get_last_passed_test_build(self, test_instance: models.TestInstance) -> Optional[JenkinsTestBuild]: metadata = self.get_project_metadata(test_instance) if 'lastSuccessfulBuild' in metadata and metadata['lastSuccessfulBuild']: return self.get_test_build(test_instance, metadata['lastSuccessfulBuild']['number']) return None - @cache.memoize() def get_last_failed_test_build(self, test_instance: models.TestInstance) -> Optional[JenkinsTestBuild]: metadata = self.get_project_metadata(test_instance) if 'lastFailedBuild' in metadata and metadata['lastFailedBuild']: return self.get_test_build(test_instance, metadata['lastFailedBuild']['number']) return None - @cache.memoize() def test_builds(self, test_instance: models.TestInstance) -> list: builds = [] metadata = self.get_project_metadata(test_instance) @@ -112,7 +107,6 @@ def test_builds(self, test_instance: models.TestInstance) -> list: builds.append(self.get_test_build(test_instance, build_info['number'])) return builds - @cache.memoize() def get_project_metadata(self, test_instance: models.TestInstance, fetch_all_builds=False): if not hasattr(test_instance, "_raw_metadata") or test_instance._raw_metadata is None: try: @@ -122,7 +116,6 @@ def get_project_metadata(self, test_instance: models.TestInstance, fetch_all_bui raise lm_exceptions.TestingServiceException(f"{self}: {e}") return test_instance._raw_metadata - @cache.memoize() def get_test_builds(self, test_instance: models.TestInstance, limit=10) -> list: builds = [] project_metadata = self.get_project_metadata(test_instance, fetch_all_builds=(limit > 100)) @@ -132,17 +125,9 @@ def get_test_builds(self, test_instance: models.TestInstance, limit=10) -> list: builds.append(self.get_test_build(test_instance, build_info['number'])) return builds - @cache.memoize() def _get_build_info(self, test_instance: models.TestInstance, build_number: int): return self.server.get_build_info(self.get_job_name(test_instance.resource), int(build_number)) - def _disable_build_cache(func, obj: JenkinsTestingService, - test_instance: models.TestInstance, build_number: int, - *args, **kwargs): - build = JenkinsTestBuild(obj, test_instance, obj._get_build_info(test_instance, build_number)) - return build.is_running() - - @cache.memoize(timeout=Timeout.BUILDS, unless=_disable_build_cache) def get_test_build(self, test_instance: models.TestInstance, build_number: int) -> JenkinsTestBuild: try: build_metadata = self._get_build_info(test_instance, build_number) @@ -152,7 +137,6 @@ def get_test_build(self, test_instance: models.TestInstance, build_number: int) except jenkins.JenkinsException as e: raise lm_exceptions.TestingServiceException(e) - @cache.memoize() def get_test_build_external_link(self, test_build: models.TestBuild) -> str: return urllib.parse.urljoin(test_build.url, "console") @@ -224,7 +208,3 @@ def result(self) -> models.TestBuild.Result: @property def url(self) -> str: return self.metadata['url'] - - @property - def external_link(self) -> str: - return self.testing_service.get_test_build_external_link(self) diff --git a/lifemonitor/api/models/services/travis.py b/lifemonitor/api/models/services/travis.py index aa85bb0e3..4bd3d0a0f 100644 --- a/lifemonitor/api/models/services/travis.py +++ b/lifemonitor/api/models/services/travis.py @@ -28,12 +28,11 @@ import lifemonitor.api.models as models import requests -from lifemonitor.cache import Timeout, cache +from lifemonitor.api.models.services.service import TestingService +from lifemonitor.cache import Timeout, cached from lifemonitor.exceptions import (EntityNotFoundException, TestingServiceException) -from .service import TestingService - # set module level logger logger = logging.getLogger(__name__) @@ -86,7 +85,6 @@ def _build_url(self, path, params=None): query = "?" + urllib.parse.urlencode(params) if params else "" return urllib.parse.urljoin(self.api_base_url, path + query) - @cache.memoize() def _get(self, path, token: models.TestingServiceToken = None, params=None) -> object: logger.debug("Getting resource: %r", self._build_url(path, params)) response = requests.get(self._build_url(path, params), headers=self._build_headers(token)) @@ -104,7 +102,6 @@ def get_repo_id(test_instance: models.TestInstance, quote=True): f"Unable to get the Travis job from the resource {test_instance.resource}") return repo_id - @cache.memoize() def get_repo_slug(self, test_instance: models.TestInstance): metadata = self.get_project_metadata(test_instance) return metadata['slug'] @@ -128,32 +125,23 @@ def _get_last_test_build(self, test_instance: models.TestInstance, state=None) - except Exception as e: raise TestingServiceException(e) - @cache.memoize() - def get_instance_external_link(self, test_instance: models.TestInstance) -> str: - testing_service = test_instance.testing_service - repo_slug = testing_service.get_repo_slug(test_instance) - return urllib.parse.urljoin(testing_service.base_url, f'{repo_slug}/builds') - - @cache.memoize() def get_last_test_build(self, test_instance: models.TestInstance) -> Optional[models.TravisTestBuild]: return self._get_last_test_build(test_instance) - @cache.memoize() def get_last_passed_test_build(self, test_instance: models.TestInstance) -> Optional[models.TravisTestBuild]: return self._get_last_test_build(test_instance, state='passed') - @cache.memoize() def get_last_failed_test_build(self, test_instance: models.TestInstance) -> Optional[models.TravisTestBuild]: return self._get_last_test_build(test_instance, state='failed') - @cache.memoize() + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) def get_project_metadata(self, test_instance: models.TestInstance): try: + logger.debug("Getting Travis project metadata...") return self._get("/repo/{}".format(self.get_repo_id(test_instance))) except Exception as e: raise TestingServiceException(f"{self}: {e}") - @cache.memoize() def get_test_builds(self, test_instance: models.TestInstance, limit=10) -> list: try: repo_id = self.get_repo_id(test_instance) @@ -191,11 +179,14 @@ def _disable_build_cache(func, obj: TravisTestingService, build = obj._get_test_build(test_instance, build_number) return build.is_running() - @cache.memoize(timeout=Timeout.BUILDS, unless=_disable_build_cache) def get_test_build(self, test_instance: models.TestInstance, build_number: int) -> models.TravisTestBuild: return self._get_test_build(test_instance, build_number) - @cache.memoize() + def get_instance_external_link(self, test_instance: models.TestInstance) -> str: + testing_service = test_instance.testing_service + repo_slug = testing_service.get_repo_slug(test_instance) + return urllib.parse.urljoin(testing_service.base_url, f'{repo_slug}/builds') + def get_test_build_external_link(self, test_build: models.TestBuild) -> str: testing_service = test_build.test_instance.testing_service repo_slug = testing_service.get_repo_slug(test_build.test_instance) @@ -295,7 +286,3 @@ def result(self) -> models.TestBuild.Result: @property def url(self) -> str: return "{}{}".format(self.testing_service.url, self.metadata['@href']) - - @property - def external_link(self) -> str: - return self.testing_service.get_test_build_external_link(self) diff --git a/lifemonitor/api/models/status.py b/lifemonitor/api/models/status.py index 4792bf6c4..a4854497a 100644 --- a/lifemonitor/api/models/status.py +++ b/lifemonitor/api/models/status.py @@ -23,6 +23,7 @@ import logging import lifemonitor.exceptions as lm_exceptions +from lifemonitor.lang import messages # set module level logger logger = logging.getLogger(__name__) @@ -78,13 +79,13 @@ def check_status(suites): if len(suites) == 0: availability_issues.append({ - "issue": "No test suite configured for this workflow" + "issue": messages.no_test_suite }) for suite in suites: if len(suite.test_instances) == 0: availability_issues.append({ - "issue": f"No test instances configured for suite {suite}" + "issue": messages.no_test_instance_for_suite.format(suite) }) for test_instance in suite.test_instances: try: @@ -93,7 +94,7 @@ def check_status(suites): availability_issues.append({ "service": test_instance.testing_service.url, "test_instance": test_instance, - "issue": "No build found" + "issue": messages.no_build_found_for_instance.format(test_instance) }) else: latest_builds.append(latest_build) diff --git a/lifemonitor/api/models/testsuites/testbuild.py b/lifemonitor/api/models/testsuites/testbuild.py index ac2892149..a14abda66 100644 --- a/lifemonitor/api/models/testsuites/testbuild.py +++ b/lifemonitor/api/models/testsuites/testbuild.py @@ -25,6 +25,7 @@ from enum import Enum import lifemonitor.api.models as models +from lifemonitor.cache import CacheMixin, Timeout, cached # set module level logger logger = logging.getLogger(__name__) @@ -39,7 +40,7 @@ class BuildStatus: ABORTED = "aborted" -class TestBuild(ABC): +class TestBuild(ABC, CacheMixin): class Result(Enum): SUCCESS = 0 FAILED = 1 @@ -53,6 +54,10 @@ def __init__(self, testing_service: models.TestingService, test_instance: models def __repr__(self) -> str: return f"TestBuild '{self.id}' @ instance '{self.test_instance.uuid}'" + def __eq__(self, other): + return isinstance(other, TestBuild) \ + and self.id == other.id and self.test_instance == other.test_instance + def is_successful(self): return self.result == TestBuild.Result.SUCCESS @@ -106,9 +111,12 @@ def url(self) -> str: pass @property - @abstractmethod def external_link(self) -> str: - pass + return self.get_external_link() + + @cached(timeout=Timeout.BUILD, client_scope=False) + def get_external_link(self): + return self.testing_service.get_test_build_external_link(self) def get_output(self, offset_bytes=0, limit_bytes=131072): return self.testing_service.get_test_build_output(self.test_instance, self.id, offset_bytes, limit_bytes) @@ -123,3 +131,15 @@ def to_dict(self, test_output=False) -> dict: if test_output: data['output'] = self.output return data + + def __getstate__(self): + return { + "testing_service": self.testing_service.uuid, + "test_instance": self.test_instance.uuid, + "metadata": self._metadata + } + + def __setstate__(self, state): + self.testing_service = models.TestingService.find_by_uuid(state['testing_service']) + self.test_instance = models.TestInstance.find_by_uuid(state['test_instance']) + self._metadata = state['metadata'] diff --git a/lifemonitor/api/models/testsuites/testinstance.py b/lifemonitor/api/models/testsuites/testinstance.py index 8c8664b2a..b49e28e23 100644 --- a/lifemonitor/api/models/testsuites/testinstance.py +++ b/lifemonitor/api/models/testsuites/testinstance.py @@ -26,6 +26,7 @@ import lifemonitor.api.models as models from lifemonitor.api.models import db +from lifemonitor.cache import Timeout, cached from lifemonitor.models import JSON, UUID, ModelMixin from .testsuite import TestSuite @@ -74,6 +75,13 @@ def __init__(self, testing_suite: TestSuite, submitter: models.User, def __repr__(self): return ''.format(self.uuid, self.test_suite.uuid) + def __eq__(self, o: object) -> bool: + return isinstance(o, TestInstance) and o.uuid == self.uuid + + @property + def _cache_key_prefix(self): + return str(self) + @property def is_roc_instance(self): return self.roc_instance is not None @@ -84,15 +92,30 @@ def managed(self): @property def external_link(self): + try: + return self.get_external_link() + except Exception: + return None + + @cached(timeout=Timeout.BUILD, client_scope=False) + def get_external_link(self): return self.testing_service.get_instance_external_link(self) @property def last_test_build(self): - return self.testing_service.get_last_test_build(self) + return self.get_last_test_build() + + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) + def get_last_test_build(self): + builds = self.get_test_builds() + return builds[0] if builds and len(builds) > 0 else None + @cached(timeout=Timeout.NONE, client_scope=False, transactional_update=True) def get_test_builds(self, limit=10): return self.testing_service.get_test_builds(self, limit=limit) + @cached(timeout=Timeout.BUILD, client_scope=False, transactional_update=True, + unless=lambda b: b.status in [models.BuildStatus.RUNNING, models.BuildStatus.WAITING]) def get_test_build(self, build_number): return self.testing_service.get_test_build(self, build_number) diff --git a/lifemonitor/api/models/workflows.py b/lifemonitor/api/models/workflows.py index 574ac275b..3bece861a 100644 --- a/lifemonitor/api/models/workflows.py +++ b/lifemonitor/api/models/workflows.py @@ -24,6 +24,7 @@ from typing import List, Union import lifemonitor.api.models as models +from lifemonitor.cache import Timeout, cached import lifemonitor.exceptions as lm_exceptions from lifemonitor import utils as lm_utils from lifemonitor.api.models import db @@ -221,6 +222,10 @@ def check_health(self) -> dict: @property def external_link(self) -> str: + return self.get_external_link() + + @cached(Timeout.WORKFLOW, client_scope=False) + def get_external_link(self) -> str: if self.hosting_service is None: return self.uri return self.hosting_service.get_external_link(self.workflow.external_id, self.version) @@ -237,6 +242,10 @@ def authorizations(self): def roc_link(self) -> str: return self.uri + @property + def workflow_name(self) -> str: + return self.name or self.main_entity_name or self.dataset_name + @property def is_latest(self) -> bool: return self.workflow.latest_version.version == self.version @@ -307,7 +316,7 @@ def get_public_workflow_version(cls, uuid, version) -> WorkflowVersion: .filter(Workflow.public == true())\ .filter(cls.version == version).one() # noqa: E712 except NoResultFound as e: - logger.exception(e) + logger.debug(e) return None except Exception as e: raise lm_exceptions.LifeMonitorException(detail=str(e), stack=str(e)) @@ -322,7 +331,7 @@ def get_user_workflow_version(cls, owner: User, uuid, version) -> WorkflowVersio .filter(Permission.user_id == owner.id)\ .filter(cls.version == version).one() except NoResultFound as e: - logger.exception(e) + logger.debug(e) return None except Exception as e: raise lm_exceptions.LifeMonitorException(detail=str(e), stack=str(e)) diff --git a/lifemonitor/api/serializers.py b/lifemonitor/api/serializers.py index 7796cfb46..9e508050a 100644 --- a/lifemonitor/api/serializers.py +++ b/lifemonitor/api/serializers.py @@ -24,10 +24,10 @@ from typing import List from urllib.parse import urljoin -from lifemonitor import utils as lm_utils from lifemonitor import exceptions as lm_exceptions +from lifemonitor import utils as lm_utils from lifemonitor.auth import models as auth_models -from lifemonitor.auth.serializers import UserSchema, SubscriptionSchema +from lifemonitor.auth.serializers import SubscriptionSchema, UserSchema from lifemonitor.serializers import (BaseSchema, ListOfItems, ResourceMetadataSchema, ResourceSchema, ma) @@ -219,9 +219,11 @@ class Meta: links = fields.Method('get_links') def get_links(self, obj): - links = { - 'origin': obj.external_link - } + links = {} + try: + links['origin'] = obj.external_link + except lm_exceptions.RateLimitExceededException: + links['origin'] = None if self._self_link: links['self'] = self.self_link return links @@ -267,6 +269,37 @@ def get_links(self, obj): return links +def format_availability_issues(status: models.WorkflowStatus): + issues = status.availability_issues + logger.info(issues) + if 'not_available' == status.aggregated_status and len(issues) > 0: + return ', '.join([f"{i['issue']}: Unable to get resource '{i['resource']}' from service '{i['service']}'" if 'service' in i else i['issue'] for i in issues]) + return None + + +class WorkflowStatusSchema(WorkflowVersionSchema): + __envelope__ = {"single": None, "many": "items"} + __model__ = models.WorkflowStatus + + class Meta: + model = models.WorkflowStatus + + aggregate_test_status = fields.String(attribute="status.aggregated_status") + latest_builds = ma.Nested(BuildSummarySchema(exclude=('meta', 'links')), + attribute="status.latest_builds", many=True) + reason = fields.Method("get_reason") + + def get_reason(self, workflow_version): + return format_availability_issues(workflow_version.status) + + @post_dump + def remove_skip_values(self, data, **kwargs): + return { + key: value for key, value in data.items() + if value is not None + } + + class WorkflowVersionListItem(WorkflowSchema): subscriptionsOf: List[auth_models.User] = None @@ -281,10 +314,14 @@ def __init__(self, *args, self_link: bool = True, subscriptionsOf: List[auth_mod def get_status(self, workflow): try: - return { + result = { "aggregate_test_status": workflow.latest_version.status.aggregated_status, "latest_build": self.get_latest_build(workflow) } + reason = format_availability_issues(workflow.latest_version.status) + if reason: + result['reason'] = reason + return result except lm_exceptions.RateLimitExceededException as e: logger.debug(e) return { @@ -330,18 +367,6 @@ def get_items(self, obj): if self.__item_scheme__ else None -class WorkflowStatusSchema(WorkflowVersionSchema): - __envelope__ = {"single": None, "many": "items"} - __model__ = models.WorkflowStatus - - class Meta: - model = models.WorkflowStatus - - aggregate_test_status = fields.String(attribute="status.aggregated_status") - latest_builds = ma.Nested(BuildSummarySchema(exclude=('meta', 'links')), - attribute="status.latest_builds", many=True) - - class SuiteSchema(ResourceMetadataSchema): __envelope__ = {"single": None, "many": "items"} __model__ = models.TestSuite @@ -381,6 +406,17 @@ class Meta: suite_uuid = fields.String(attribute="suite.uuid") status = fields.String(attribute="aggregated_status") latest_builds = fields.Nested(BuildSummarySchema(exclude=('meta', 'links')), many=True) + reason = fields.Method("get_reason") + + def get_reason(self, status): + return format_availability_issues(status) + + @post_dump + def remove_skip_values(self, data, **kwargs): + return { + key: value for key, value in data.items() + if value is not None + } class ListOfTestInstancesSchema(ListOfItems): diff --git a/lifemonitor/api/services.py b/lifemonitor/api/services.py index e895145d5..4c37af45e 100644 --- a/lifemonitor/api/services.py +++ b/lifemonitor/api/services.py @@ -136,9 +136,15 @@ def register_workflow(cls, roc_link, workflow_submitter: User, workflow_version, if authorization: auth = ExternalServiceAuthorizationHeader(workflow_submitter, header=authorization) auth.resources.append(wv) + if name is None: - w.name = wv.dataset_name - wv.name = wv.dataset_name + if wv.workflow_name is None: + raise lm_exceptions.LifeMonitorException(title="Missing attribute 'name'", + detail="Attribute 'name' is not defined and it cannot be retrieved ' \ + 'from the workflow RO-Crate (name of 'mainEntity' and '/' dataset not set)", + status=400) + w.name = wv.workflow_name + wv.name = wv.workflow_name # set workflow visibility w.public = public diff --git a/lifemonitor/auth/controllers.py b/lifemonitor/auth/controllers.py index 656b713d2..d7459d55a 100644 --- a/lifemonitor/auth/controllers.py +++ b/lifemonitor/auth/controllers.py @@ -23,7 +23,7 @@ import flask from flask import flash, redirect, render_template, request, session, url_for from flask_login import login_required, login_user, logout_user -from lifemonitor.cache import cached +from lifemonitor.cache import cached, Timeout, clear_cache from lifemonitor.utils import (NextRouteRegistry, next_route_aware, split_by_crlf) @@ -50,7 +50,7 @@ @authorized -@cached(timeout=3600) +@cached(timeout=Timeout.SESSION) def show_current_user_profile(): try: if current_user and not current_user.is_anonymous: @@ -66,7 +66,7 @@ def user_subscriptions_get(): @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def get_registry_users(): try: if current_registry and current_user.is_anonymous: @@ -81,7 +81,7 @@ def get_registry_users(): @authorized -@cached() +@cached(timeout=Timeout.REQUEST) def get_registry_user(user_id): try: if current_registry: @@ -97,6 +97,7 @@ def index(): @blueprint.route("/profile", methods=("GET",)) +@cached(timeout=Timeout.SESSION) def profile(form=None, passwordForm=None, currentView=None): currentView = currentView or request.args.get("currentView", 'accountsTab') logger.debug(OpenApiSpecs.get_instance().authorization_code_scopes) @@ -130,6 +131,7 @@ def register(): if user: login_user(user) flash("Account created", category="success") + clear_cache() return redirect(url_for("auth.index")) return render_template("auth/register.j2", form=form, action='/register', providers=get_providers()) @@ -152,6 +154,7 @@ def register_identity(): if user: login_user(user) flash("Account created", category="success") + clear_cache() return redirect(url_for("auth.index")) return render_template("auth/register.j2", form=form, action='/register_identity', identity=identity, user=user, providers=get_providers()) @@ -223,6 +226,7 @@ def create_apikey(): if apikey: logger.debug("Created a new API key: %r", apikey) flash("API key created!", category="success") + clear_cache() else: flash("API key not created!", category="error") return redirect(url_for('auth.profile', currentView='apiKeysTab')) @@ -237,6 +241,7 @@ def delete_apikey(): flash("Unable to find the API key") else: delete_api_key(current_user, apikey) + clear_cache() flash("API key removed!", category="success") return redirect(url_for('auth.profile', currentView='apiKeysTab')) @@ -284,6 +289,7 @@ def save_generic_code_flow_client(): data['auth_method']) logger.debug("Client updated: %r", client) flash("App Updated", category="success") + clear_cache() else: logger.debug("Ops... validation failed") return profile(form=form, currentView="oauth2ClientEditorPane") @@ -311,6 +317,7 @@ def edit_generic_code_flow_client(): logger.debug("AuthMethod: %r", form.auth_method.data) for scope in form.scopes: logger.debug("A scope: %r", scope.data) + clear_cache() return profile(form=form, currentView="oauth2ClientEditorPane") @@ -328,4 +335,5 @@ def delete_generic_code_flow_client(): flash("Unable to delete the OAuth App", category="error") else: flash("App removed!", category="success") + clear_cache() return redirect(url_for('auth.profile', currentView='oauth2ClientsTab')) diff --git a/lifemonitor/auth/forms.py b/lifemonitor/auth/forms.py index 76cc82a5c..4ae88053a 100644 --- a/lifemonitor/auth/forms.py +++ b/lifemonitor/auth/forms.py @@ -29,11 +29,16 @@ from sqlalchemy.exc import IntegrityError from wtforms import (BooleanField, HiddenField, PasswordField, SelectField, SelectMultipleField, StringField) -from wtforms.fields.html5 import URLField from wtforms.validators import URL, DataRequired, EqualTo, Optional from .models import User, db +try: + from wtforms import URLField +except ImportError: + from wtforms.fields.html5 import URLField + + # Set the module level logger logger = logging.getLogger(__name__) diff --git a/lifemonitor/auth/templates/auth/apikeys_tab.j2 b/lifemonitor/auth/templates/auth/apikeys_tab.j2 index a505d83c8..9ed4edba3 100644 --- a/lifemonitor/auth/templates/auth/apikeys_tab.j2 +++ b/lifemonitor/auth/templates/auth/apikeys_tab.j2 @@ -1,8 +1,30 @@ -
- Allow a user to interact with the LifeMonitor API. - An API key acts as a static authentication token - that can be used to quickly - try API calls via the API docs interface or tools like curl. +
+
+ Allow a user to interact with the LifeMonitor API. + An API key acts as a static authentication token + that can be used to quickly + try API calls via the API docs interface or tools like curl. + asdasad +
+
diff --git a/lifemonitor/auth/templates/auth/base.j2 b/lifemonitor/auth/templates/auth/base.j2 index 0ea37cc2e..3c388e855 100644 --- a/lifemonitor/auth/templates/auth/base.j2 +++ b/lifemonitor/auth/templates/auth/base.j2 @@ -9,7 +9,10 @@ {% block stylesheets %} - + + + + diff --git a/lifemonitor/auth/templates/auth/oauth2_clients_tab.j2 b/lifemonitor/auth/templates/auth/oauth2_clients_tab.j2 index 45e52d049..e533edc44 100644 --- a/lifemonitor/auth/templates/auth/oauth2_clients_tab.j2 +++ b/lifemonitor/auth/templates/auth/oauth2_clients_tab.j2 @@ -1,9 +1,29 @@ -
-
+
+
Allow an OAuth2 client to interact with LifeMonitor on behalf of a user. This authentication method is appropriate for applications that need to interact with LifeMonitor as a user. -
+
+ +

OAuth2 Apps

@@ -87,4 +107,4 @@
-
+ diff --git a/lifemonitor/cache.py b/lifemonitor/cache.py index ab47fa8eb..c69239938 100644 --- a/lifemonitor/cache.py +++ b/lifemonitor/cache.py @@ -23,24 +23,381 @@ import functools import logging import os +import pickle +import threading +import time +from contextlib import contextmanager +import redis +import redis_lock +from flask import request from flask.app import Flask -from flask_caching import Cache +from flask.globals import current_app -# Set default timeouts +# Set prefix +CACHE_PREFIX = "lifemonitor-api-cache:" + + +# Set module logger +logger = logging.getLogger(__name__) + + +def _get_timeout(name: str, default: int = 0, config=None) -> int: + result = None + try: + config = current_app.config if config is None else config + if config is not None: + result = config.get(name) + except Exception as e: + logger.debug(e) + result = result if result is not None else os.environ.get(name, default) + logger.debug("Getting timeout %r: %r", name, result) + return int(result) + + +def _get_timeout_key(n: str) -> str: + return f"CACHE_{n}_TIMEOUT" class Timeout: - DEFAULT = os.environ.get('CACHE_DEFAULT_TIMEOUT', 300) - SESSION = os.environ.get('CACHE_SESSION_TIMEOUT', 3600) - BUILDS = os.environ.get('CACHE_SESSION_TIMEOUT', 84600) + # Set default timeouts + NONE = 0 + DEFAULT = _get_timeout(_get_timeout_key('DEFAULT'), default=300) + REQUEST = _get_timeout(_get_timeout_key('REQUEST'), default=30) + SESSION = _get_timeout(_get_timeout_key('SESSION'), default=3600) + WORKFLOW = _get_timeout(_get_timeout_key('WORKFLOW'), default=1800) + BUILD = _get_timeout(_get_timeout_key('BUILD'), default=300) + @classmethod + def update(cls, config=None): + for t in ('DEFAULT', 'REQUEST', 'SESSION', 'BUILD', 'WORKFLOW'): + try: + key = _get_timeout_key(t) + setattr(cls, t, _get_timeout(key, config=config)) + except Exception: + logger.debug("Error when updating timeout %r", t) -# Set module logger -logger = logging.getLogger(__name__) -# Instantiate cache manager -cache = Cache() +class IllegalStateException(RuntimeError): + pass + + +class CacheTransaction(object): + + _current_transaction = threading.local() + + @classmethod + def get_current_transaction(cls) -> CacheTransaction: + try: + return cls._current_transaction.value + except AttributeError: + return None + + @classmethod + def set_current_transaction(cls, t: CacheTransaction): + cls._current_transaction.value = t + + def __init__(self, cache: Cache, name=None): + self.__name = name or f"T-{id(self)}" + self.__cache__ = cache + self.__locks__ = {} + self.__data__ = {} + self.__started__ = False + self.__closed__ = False + + def __repr__(self) -> str: + return f"CacheTransaction#{self.name}" + + @property + def cache(self): + return self.__cache__ + + @property + def name(self): + return self.__name + + def make_key(self, key: str, prefix: str = CACHE_PREFIX) -> str: + return self.__cache__._make_key(key, prefix=prefix) + + def set(self, key: str, value, timeout: int = Timeout.REQUEST, prefix: str = CACHE_PREFIX): + self.__data__[self.make_key(key, prefix=prefix)] = (value, timeout) + + def get(self, key: str, prefix: str = CACHE_PREFIX): + data = self.__data__.get(self.make_key(key, prefix=prefix), None) + return data[0] if data is not None else None + + def keys(self): + return list(self.__data__.keys()) + + def has(self, key: str) -> bool: + if key is None: + return False + return self.make_key(key) in self.keys() + + @contextmanager + def lock(self, key: str, + timeout: int = Timeout.REQUEST, + expire=15, retry=1, auto_renewal=True): + logger.debug("Getting lock for key %r...", key) + if key in self.__locks__: + yield self.__locks__[key] + else: + lock = redis_lock.Lock(self.cache.backend, key, expire=expire, auto_renewal=auto_renewal, id=self.name) + while not lock.acquire(blocking=False, timeout=timeout if timeout > 0 else None): + logger.debug("Waiting for lock key '%r'... (retry in %r secs)", lock, retry) + time.sleep(retry) + logger.debug("Lock for key '%r' acquired: %r", key, lock.locked) + self.__locks__[key] = lock + logger.debug("Lock for key '%r' added to transaction %r: %r", key, self.name, self.has_lock(key)) + try: + yield lock + finally: + logger.debug("Releasing transactional lock context for key '%s'", key) + + def has_lock(self, key: str) -> bool: + return key in self.__locks__ + + def size(self) -> int: + return len(self.__data__.keys()) + + def __enter__(self): + self.start() + return self + + def __exit__(self, type, value, traceback): + self.close() + return True + + def is_started(self) -> bool: + return self.__started__ + + def start(self): + logger.debug(f"Starting transaction {self} ...") + self.__data__.clear() + self.__locks__.clear() + self.__started__ = True + self.__closed__ = False + + def close(self): + if self.__closed__: + logger.debug(f"{self} already closed") + else: + logger.debug(f"Stopping {self}...") + try: + logger.debug("Finalizing transaction...") + pipeline = self.__cache__.backend.pipeline() + for k, data in self.__data__.items(): + logger.debug(f"Setting key {k} on transaction pipeline (timeout: {data[1]}") + pipeline.set(k, pickle.dumps(data[0]), ex=data[1] if data[1] > 0 else None) + pipeline.execute() + logger.debug("Transaction finalized!") + for k in list(self.__locks__.keys()): + lk = self.__locks__.pop(k) + if lk: + if lk.locked: + logger.debug("Releasing lock for key '%r'...", k) + try: + lk.release() + logger.debug("Lock for key '%r' released: %r", k, lk.locked) + except redis_lock.NotAcquired as e: + logger.warning(e) + else: + logger.debug("Lock for key '%s' not acquired or expired") + else: + logger.debug("No lock for key %r", k) + logger.debug(f"All lock of {self} released") + logger.debug(f"{self} closed") + except Exception as e: + logger.exception(e) + finally: + self.__closed__ = True + self.__cache__._set_current_transaction(None) + logger.debug(f"{self} finished") + + +_current_transaction = threading.local() + + +class Cache(object): + + # Enable/Disable cache + cache_enabled = True + # Ignore cache values even if cache is enabled + _ignore_cache_values = False + # Reference to Redis back-end + __cache__ = None + + @classmethod + def init_backend(cls, config): + logger.debug("Initialising cache back-end...") + logger.debug("Cache type detected: %r", config.get("CACHE_TYPE", None)) + if config.get("CACHE_TYPE", None) == "flask_caching.backends.rediscache.RedisCache": + logger.debug("Configuring Redis back-end...") + cls.__cache__ = redis.Redis.from_url(config.get("CACHE_REDIS_URL")) + cls.cache_enabled = True + else: + logger.debug("No cache") + cls.__cache__ = None + cls.cache_enabled = False + return cls.__cache__ + + @classmethod + def get_backend(cls) -> redis.Redis: + if cls.__cache__ is None: + raise IllegalStateException("Back-end not initialized!") + return cls.__cache__ + + @classmethod + def init_app(cls, app: Flask): + cls.init_backend(app.config) + if cls.__cache__ is not None: + cls.reset_locks() + + def __init__(self, parent: Cache = None) -> None: + self._local = _current_transaction + self._parent = parent + + @staticmethod + def _make_key(key: str, prefix: str = CACHE_PREFIX) -> str: + return f"{prefix}{key}" + + @property + def parent(self) -> Cache: + return self._parent + + @property + def ignore_cache_values(self): + return self._ignore_cache_values is True and \ + (self.parent and self.parent.ignore_cache_values is True) + + @ignore_cache_values.setter + def ignore_cache_values(self, value: bool): + self._ignore_cache_values = True if value is True else False + + def _set_current_transaction(self, t: CacheTransaction): + CacheTransaction.set_current_transaction(t) + + def get_current_transaction(self) -> CacheTransaction: + return CacheTransaction.get_current_transaction() + + @contextmanager + def transaction(self, name=None) -> CacheTransaction: + new_transaction = False + t = self.get_current_transaction() + if t is None: + logger.debug("Creating a new transaction...") + t = CacheTransaction(self, name=name) + self._set_current_transaction(t) + new_transaction = True + else: + logger.debug("Reusing transaction in the current thread: %r", t) + try: + yield t + finally: + logger.debug("Finally closing transaction") + if not new_transaction: + logger.debug("Transaction not initialized in this context: it should continue") + else: + try: + t.close() + except Exception as fe: + logger.debug(fe) + self._set_current_transaction(None) + + @property + def backend(self) -> redis.Redis: + return self.get_backend() + + def keys(self, pattern: str = None): + query = f"{CACHE_PREFIX}" + if pattern is not None: + query = f"{query}{pattern}" + else: + query = f"{query}*" + logger.debug("Keys pattern: %r", query) + return self.backend.keys(query) + + def size(self, pattern=None): + return len(self.keys(pattern=pattern)) + + def to_dict(self, pattern=None): + return {k: self.backend.get(k) for k in self.keys(pattern=pattern)} + + @contextmanager + def lock(self, key: str, + timeout: int = Timeout.REQUEST, + expire=15, retry=1, auto_renewal=True): + logger.debug("Getting lock for key %r...", key) + lock = redis_lock.Lock(self.backend, key, expire=expire, auto_renewal=auto_renewal) + try: + while not lock.acquire(blocking=False, timeout=timeout if timeout > 0 else None): + logger.debug("Waiting to acquire the lock for '%r'... (retry in %r secs)", lock, retry) + time.sleep(retry) + logger.debug(f"Lock for key '{key}' acquired: {lock.locked}") + yield lock + finally: + try: + logger.debug("Exiting from transactional lock context for key '%s'", key) + if not lock.locked: + logger.debug("Lock for key '%s' not acquired", key) + else: + logger.debug("Auto release of lock for key '%s'", key) + lock.release() + logger.debug("Lock for key='%s' released: %r", key, lock.locked) + except redis_lock.NotAcquired as e: + logger.debug(e) + + def set(self, key: str, value, timeout: int = Timeout.NONE, prefix: str = CACHE_PREFIX): + if key is not None and self.cache_enabled: + key = self._make_key(key, prefix=prefix) + logger.debug("Setting cache value for key %r.... (timeout: %r)", key, timeout) + if value is None: + self.backend.delete(key) + else: + self.backend.set(key, pickle.dumps(value), ex=timeout if timeout > 0 else None) + + def has(self, key: str, prefix: str = CACHE_PREFIX) -> bool: + return self.get(key, prefix=prefix) is not None + + def _get_status(self) -> dict: + return { + "self": self, + "enabled": self.cache_enabled, + "ignore_values": self.ignore_cache_values, + "current_transaction": self.get_current_transaction(), + "transaction locks": self.get_current_transaction().__locks__ if self.get_current_transaction() else None + } + + def get(self, key: str, prefix: str = CACHE_PREFIX): + logger.debug("Getting value from cache...") + logger.debug("Cache status: %r", self._get_status()) + if not self.cache_enabled or self.ignore_cache_values: + return None + data = self.backend.get(self._make_key(key, prefix=prefix)) + logger.debug("Current cache data: %r", data is not None) + return pickle.loads(data) if data is not None else data + + def delete_keys(self, pattern: str, prefix: str = CACHE_PREFIX): + logger.debug(f"Deleting keys by pattern: {pattern}") + if self.cache_enabled: + logger.debug("Redis backend detected!") + logger.debug(f"Pattern: {prefix}{pattern}") + for key in self.backend.scan_iter(self._make_key(pattern, prefix=prefix)): + logger.debug("Delete key: %r", key) + self.backend.delete(key) + + def clear(self): + for key in self.backend.scan_iter(f"{CACHE_PREFIX}*"): + self.backend.delete(key) + self.reset_locks() + + @classmethod + def reset_locks(cls): + redis_lock.reset_all(cls.get_backend()) + + +# global cache instance +cache: Cache = Cache() def init_cache(app: Flask): @@ -51,10 +408,11 @@ def init_cache(app: Flask): logger.debug("Cache type detected: %s", cache_type) if cache_type == 'flask_caching.backends.rediscache.RedisCache': logger.debug("Configuring cache...") - app.config.setdefault('CACHE_REDIS_HOST', os.environ.get('REDIS_HOST', 'redis')) + app.config.setdefault('CACHE_REDIS_HOST', os.environ.get('REDIS_HOST', '127.0.0.1')) app.config.setdefault('CACHE_REDIS_PORT', os.environ.get('REDIS_PORT_NUMBER', 6379)) - app.config.setdefault('CACHE_REDIS_PASSWORD', os.environ.get('REDIS_PASSWORD', '')) + app.config.setdefault('CACHE_REDIS_PASSWORD', os.environ.get('REDIS_PASSWORD', 'foobar')) app.config.setdefault('CACHE_REDIS_DB', int(os.environ.get('CACHE_REDIS_DB', 0))) + app.config.setdefault("CACHE_KEY_PREFIX", CACHE_PREFIX) app.config.setdefault('CACHE_REDIS_URL', "redis://:{0}@{1}:{2}/{3}".format( app.config.get('CACHE_REDIS_PASSWORD'), app.config.get('CACHE_REDIS_HOST'), @@ -63,60 +421,129 @@ def init_cache(app: Flask): )) logger.debug("RedisCache connection url: %s", app.config.get('CACHE_REDIS_URL')) cache.init_app(app) + Timeout.update(app.config) logger.debug(f"Cache initialised (type: {cache_type})") -def _make_name(fname) -> str: +def make_cache_key(func=None, client_scope=True, args=None, kwargs=None) -> str: from lifemonitor.auth import current_registry, current_user - result = fname - if current_user and not current_user.is_anonymous: - result += "-{}-{}".format(current_user.username, current_user.id) - if current_registry: - result += "-{}".format(current_registry.uuid) - logger.debug("Calculated function name: %r", result) + hash_enabled = not logger.isEnabledFor(logging.DEBUG) + fname = "" if func is None \ + else func if isinstance(func, str) \ + else f"{func.__module__}.{func.__name__}" if callable(func) else str(func) + logger.debug("make_key func: %r", fname) + logger.debug("make_key args: %r", args) + logger.debug("make_key kwargs: %r", kwargs) + logger.debug("make_key hash enabled: %r", hash_enabled) + result = "" + if client_scope: + client_id = "" + if current_user and not current_user.is_anonymous: + client_id += "{}-{}_".format(current_user.username, current_user.id) + if current_registry: + client_id += "{}_".format(current_registry.uuid) + if not current_registry and (not current_user or current_user.is_anonymous): + client_id += "anonymous" + if request: + client_id += f"@{request.remote_addr}" + result += f"{hash(client_id) if hash_enabled else client_id}::" + if func: + result += fname + if args: + args_str = "-".join([str(_) for _ in args]) + result += f"#{hash(args_str) if hash_enabled else args_str}" + if kwargs: + kwargs_str = "-".join([f"{k}={str(v)}" for k, v in kwargs.items()]) + result += f"#{hash(kwargs_str) if hash_enabled else kwargs_str}" + logger.debug("make_key calculated key: %r", result) return result -def clear_cache(func=None, *args, **kwargs): +def clear_cache(func=None, client_scope=True, prefix=CACHE_PREFIX, *args, **kwargs): try: if func: - cache.delete_memoized(func, *args, **kwargs) + key = make_cache_key(func, client_scope) + cache.delete_keys(f"{key}*") + if args or kwargs: + key = make_cache_key(func, client_scope=client_scope, args=args, kwargs=kwargs) + cache.delete_keys(f"{key}*", prefix=prefix) else: - cache.clear() + key = make_cache_key(client_scope=client_scope) + cache.delete_keys(f"{key}*", prefix=prefix) except Exception as e: logger.error("Error deleting cache: %r", e) -def cached(timeout=Timeout.DEFAULT, unless=False): +def _process_cache_data(cache, transaction, key, unless, timeout, + read_from_cache, write_to_cache, function, args, kwargs): + # check parameters + assert read_from_cache or transaction, "Unable to read from transaction: transaction is None" + assert write_to_cache or transaction, "Unable to write to transaction: transaction is None" + # set reader/writer + reader = cache if read_from_cache else transaction + writer = cache if write_to_cache else transaction + # get/set data + result = reader.get(key) + if result is None: + logger.debug(f"Value {key} not set in cache...") + with cache.lock(key, timeout=Timeout.NONE): + result = reader.get(key) + if not result: + logger.debug("Cache empty: getting value from the actual function...") + result = function(*args, **kwargs) + logger.debug("Checking unless function: %r", unless) + if unless is None or unless is False or callable(unless) and not unless(result): + writer.set(key, result, timeout=timeout) + else: + logger.debug("Don't set value in cache due to unless=%r", + "None" if unless is None else "True") + else: + logger.debug(f"Reusing value from cache key '{key}'...") + return result + + +def cached(timeout=Timeout.REQUEST, client_scope=True, unless=None, transactional_update=False): def decorator(function): - @cache.memoize(timeout=timeout, unless=unless, make_name=_make_name) @functools.wraps(function) def wrapper(*args, **kwargs): - logger.debug("Cache arguments: %r", args) - logger.debug("Caghe kwargs: %r", kwargs) - # wrap concrete function - return function(*args, **kwargs) + logger.debug("Args: %r", args) + logger.debug("KwArgs: %r", kwargs) + obj: CacheMixin = args[0] if len(args) > 0 and isinstance(args[0], CacheMixin) else None + logger.debug("Wrapping a method of a CacheMixin instance: %r", obj is not None) + hc = cache if obj is None else obj.cache + result = None + if hc and hc.cache_enabled: + key = make_cache_key(function, client_scope, args=args, kwargs=kwargs) + transaction = hc.get_current_transaction() + if transaction or transactional_update: + read_from_cache = transaction is None + with hc.transaction() as transaction: + logger.debug("Getting value using transaction: new=%r", read_from_cache) + result = _process_cache_data(cache, transaction, + key, unless, timeout, + read_from_cache, False, + function, args, kwargs) + else: + logger.debug("Getting value from cache") + result = _process_cache_data(cache, transaction, key, unless, timeout, + True, True, function, args, kwargs) + else: + logger.debug("Cache disabled: getting value from the actual function...") + result = function(*args, **kwargs) + return result return wrapper return decorator -def cached_method(timeout=None, unless=False): - def decorator(function): +class CacheMixin(object): - def unless_wrapper(func, obj, *args, **kwargs): - f = getattr(obj, unless) - return f(obj, func, *args, **kwargs) + _cache: Cache = None - @cache.memoize(timeout=timeout, unless=unless_wrapper, make_name=_make_name) - @functools.wraps(function) - def wrapper(*args, **kwargs): - logger.debug("Cache arguments: %r", args) - logger.debug("Caghe kwargs: %r", kwargs) - # wrap concrete function - return function(*args, **kwargs) - - return wrapper - return decorator + @property + def cache(self) -> Cache: + if self._cache is None: + self._cache = Cache(parent=cache) + return self._cache diff --git a/lifemonitor/commands/cache.py b/lifemonitor/commands/cache.py new file mode 100644 index 000000000..36180b668 --- /dev/null +++ b/lifemonitor/commands/cache.py @@ -0,0 +1,45 @@ +# Copyright (c) 2020-2021 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import logging +from flask.blueprints import Blueprint +from flask.cli import with_appcontext + +# set module level logger +logger = logging.getLogger() + +# define the blueprint for DB commands +blueprint = Blueprint('cache', __name__) + + +@blueprint.cli.command('clear') +@with_appcontext +def clear(): + """ + Delete API cache + """ + from lifemonitor.cache import cache + try: + cache.clear() + except Exception as e: + print("Error when deleting cache: %s" % (str(e))) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) diff --git a/lifemonitor/commands/tasks.py b/lifemonitor/commands/tasks.py new file mode 100644 index 000000000..ef35a4a96 --- /dev/null +++ b/lifemonitor/commands/tasks.py @@ -0,0 +1,46 @@ +# Copyright (c) 2020-2021 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import logging +from flask.blueprints import Blueprint +from flask.cli import with_appcontext + +# set module level logger +logger = logging.getLogger() + +# define the blueprint for DB commands +blueprint = Blueprint('task-queue', __name__) + + +@blueprint.cli.command('reset') +@with_appcontext +def reset(): + """ + Reset task-queue status + """ + from lifemonitor.cache import clear_cache + from lifemonitor.tasks.task_queue import REDIS_NAMESPACE + try: + clear_cache(client_scope=False, prefix=REDIS_NAMESPACE) + except Exception as e: + print("Error when deleting cache: %s" % (str(e))) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) diff --git a/lifemonitor/config.py b/lifemonitor/config.py index 226853883..59546b948 100644 --- a/lifemonitor/config.py +++ b/lifemonitor/config.py @@ -104,7 +104,8 @@ class TestingConfig(BaseConfig): TESTING = True LOG_LEVEL = "DEBUG" # SQLALCHEMY_DATABASE_URI = "sqlite:///{0}/app-test.db".format(basedir) - CACHE_TYPE = "flask_caching.backends.nullcache.NullCache" + # CACHE_TYPE = "flask_caching.backends.nullcache.NullCache" + CACHE_TYPE = "flask_caching.backends.rediscache.RedisCache" class TestingSupportConfig(TestingConfig): diff --git a/lifemonitor/exceptions.py b/lifemonitor/exceptions.py index a4470023f..f0cda481b 100644 --- a/lifemonitor/exceptions.py +++ b/lifemonitor/exceptions.py @@ -50,7 +50,8 @@ def __init__(self, title=None, detail=None, pass def __repr__(self): - return f"[{self.status}] {self.title}: {self.detail}" + detail = f": {self.detail}" if self.detail else "" + return f"[{self.status}] {self.title}{detail}" def __str__(self): return self.__repr__() @@ -176,7 +177,7 @@ def __init__(self, title="Testing service error", detail="", class RateLimitExceededException(TestingServiceException): def __init__(self, detail=None, type="about:blank", status=403, instance=None, **kwargs): - super().__init__(title="RateLimitExceededException", + super().__init__(title="Rate Limit Exceeded", detail=detail, status=status, **kwargs) diff --git a/lifemonitor/lang/messages.py b/lifemonitor/lang/messages.py index 4f060e5e9..3d53ca304 100644 --- a/lifemonitor/lang/messages.py +++ b/lifemonitor/lang/messages.py @@ -25,6 +25,9 @@ no_registry_found = "Unable to find the registry {}" no_submitter_id_provided = "The registry client should provide a 'submitter_id'" no_user_oauth_identity_on_registry = "Unable to link the identity of user '{}' on the registry '{}' (not authorized yet)" +no_test_suite = "No test suite configured for this workflow" +no_build_found_for_instance = "No build found for instance {}" +no_test_instance_for_suite = "No test instances configured for suite {}" not_authorized_registry_access = "User not authorized to access the registry '{}'" not_authorized_workflow_access = "User not authorized to get workflow data" input_data_missing = "One or more input data are missing" diff --git a/lifemonitor/models.py b/lifemonitor/models.py index b67532368..b38ac3863 100644 --- a/lifemonitor/models.py +++ b/lifemonitor/models.py @@ -24,10 +24,11 @@ from typing import List from lifemonitor.db import db +from lifemonitor.cache import CacheMixin from sqlalchemy import types -class ModelMixin(object): +class ModelMixin(CacheMixin): def refresh(self, **kwargs): db.session.refresh(self, **kwargs) diff --git a/lifemonitor/static/img/logo/openapi-custom-colors.svg b/lifemonitor/static/img/logo/openapi-custom-colors.svg new file mode 100644 index 000000000..e5e11f72b --- /dev/null +++ b/lifemonitor/static/img/logo/openapi-custom-colors.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/lifemonitor/static/img/logo/openapi.svg b/lifemonitor/static/img/logo/openapi.svg new file mode 100644 index 000000000..50ee1e39f --- /dev/null +++ b/lifemonitor/static/img/logo/openapi.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/lifemonitor/static/src/package.json b/lifemonitor/static/src/package.json index 73682ab85..ba2afb2a3 100644 --- a/lifemonitor/static/src/package.json +++ b/lifemonitor/static/src/package.json @@ -1,7 +1,7 @@ { "name": "lifemonitor", "description": "Workflow Testing Service", - "version": "0.2.0", + "version": "0.4.0", "license": "MIT", "author": "CRS4", "main": "../dist/js/lifemonitor.min.js", diff --git a/lifemonitor/tasks/task_queue.py b/lifemonitor/tasks/task_queue.py index 0b2952241..cf76c29cf 100644 --- a/lifemonitor/tasks/task_queue.py +++ b/lifemonitor/tasks/task_queue.py @@ -10,6 +10,8 @@ from dramatiq.results.backends.redis import RedisBackend from flask_apscheduler import APScheduler +REDIS_NAMESPACE = 'dramatiq' + logger = logging.getLogger(__name__) @@ -53,8 +55,8 @@ def init_task_queue(app): port=int(app.config.get("REDIS_PORT_NUMBER", 6379))) logger.info("Setting up task queue. Pointing to broker %s:%s", redis_connection_params['host'], redis_connection_params['port']) - redis_broker = RedisBroker(**redis_connection_params) - result_backend = RedisBackend(**redis_connection_params) + redis_broker = RedisBroker(namespace=f"{REDIS_NAMESPACE}", **redis_connection_params) + result_backend = RedisBackend(namespace=f"{REDIS_NAMESPACE}-results", **redis_connection_params) redis_broker.add_middleware(Results(backend=result_backend)) dramatiq.set_broker(redis_broker) redis_broker.add_middleware(AppContextMiddleware(app)) @@ -64,7 +66,8 @@ def init_task_queue(app): logger.info("Starting job scheduler") app.scheduler = APScheduler() app.scheduler.init_app(app) - from . import tasks # noqa: F401 imported for its side effects - it defines the tasks + if app.config.get('ENV') not in ['testingSupport', 'testing']: + from . import tasks # noqa: F401 imported for its side effects - it defines the tasks app.scheduler.start() # Shut down the scheduler when exiting the app atexit.register(app.scheduler.shutdown) diff --git a/lifemonitor/tasks/tasks.py b/lifemonitor/tasks/tasks.py index 47756a929..3c04a927b 100644 --- a/lifemonitor/tasks/tasks.py +++ b/lifemonitor/tasks/tasks.py @@ -4,6 +4,8 @@ import dramatiq import flask from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from lifemonitor.cache import Timeout # set module level logger logger = logging.getLogger(__name__) @@ -36,6 +38,68 @@ def decorator(actor): @schedule(CronTrigger(second=0)) -@dramatiq.actor +@dramatiq.actor(max_retries=3) def heartbeat(): logger.info("Heartbeat!") + + +@schedule(IntervalTrigger(seconds=Timeout.WORKFLOW * 3 / 4)) +@dramatiq.actor(max_retries=3) +def check_workflows(): + from flask import current_app + from lifemonitor.api.controllers import workflows_rocrate_download + from lifemonitor.api.models import Workflow + from lifemonitor.auth.services import login_user, logout_user + + logger.info("Starting 'check_workflows' task....") + for w in Workflow.all(): + try: + for v in w.versions.values(): + with v.cache.transaction(str(v)): + logger.info("Updating external link: %r", v.external_link) + u = v.submitter + with current_app.test_request_context(): + try: + if u is not None: + login_user(u) + logger.info("Updating RO-Crate...") + workflows_rocrate_download(w.uuid, v.version) + logger.info("Updating RO-Crate... DONE") + except Exception as e: + logger.error(f"Error when updating the workflow {w}: {str(e)}") + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + finally: + try: + logout_user() + except Exception as e: + logger.debug(e) + except Exception as e: + logger.error("Error when executing task 'check_workflows': %s", str(e)) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + logger.info("Starting 'check_workflows' task.... DONE!") + + +@schedule(IntervalTrigger(seconds=Timeout.BUILD * 3 / 4)) +@dramatiq.actor(max_retries=3) +def check_last_build(): + from lifemonitor.api.models import Workflow + + logger.info("Starting 'check_last build' task...") + for w in Workflow.all(): + try: + for s in w.latest_version.test_suites: + logger.info("Updating workflow: %r", w) + for i in s.test_instances: + with i.cache.transaction(str(i)): + builds = i.get_test_builds() + logger.debug("Updating latest builds: %r", builds) + for b in builds: + logger.debug("Updating build: %r", i.get_test_build(b.id)) + logger.debug("Updating latest build: %r", i.last_test_build) + except Exception as e: + logger.error("Error when executing task 'check_last_build': %s", str(e)) + if logger.isEnabledFor(logging.DEBUG): + logger.exception(e) + logger.info("Checking last build: DONE!") diff --git a/lifemonitor/utils.py b/lifemonitor/utils.py index 79898e6ab..cba1b2723 100644 --- a/lifemonitor/utils.py +++ b/lifemonitor/utils.py @@ -453,8 +453,18 @@ def _load_concrete_types(self): logger.exception(e) return self.__concrete_types__ + @property + def _concrete_types(self): + return self._load_concrete_types() + + def add_class(self, type_name, type_class): + self._concrete_types[type_name] = (type_class,) + + def remove_class(self, type_name): + return self._concrete_types.pop(type_name, None) + def get_class(self, concrete_type): - return self._load_concrete_types()[concrete_type][0] + return self._concrete_types[concrete_type][0] def get_classes(self): - return [_[0] for _ in self._load_concrete_types().values()] + return [_[0] for _ in self._concrete_types.values()] diff --git a/migrations/versions/861eca55901d_fix_workflows_with_no_name.py b/migrations/versions/861eca55901d_fix_workflows_with_no_name.py new file mode 100644 index 000000000..6319fd8fd --- /dev/null +++ b/migrations/versions/861eca55901d_fix_workflows_with_no_name.py @@ -0,0 +1,24 @@ +"""Fix workflows with no name + +Revision ID: 861eca55901d +Revises: 01684f92a380 +Create Date: 2021-10-30 15:51:52.296778 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = '861eca55901d' +down_revision = '01684f92a380' +branch_labels = None +depends_on = None + + +def upgrade(): + bind = op.get_bind() + bind.execute("update resource set name='unknown' where id in (select id from workflow natural join resource where name='')") + + +def downgrade(): + pass diff --git a/requirements.txt b/requirements.txt index 462a06a40..edcd4c8ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,8 +13,6 @@ flask-wtf~=0.15.1 Flask-APScheduler==1.12.2 Flask-SQLAlchemy==2.5.1 Flask-Migrate==3.1.0 -Flask-Caching==1.10.1 -flask-wtf~=0.15.1 Flask>=1.1.4,<2.0.0 gunicorn~=20.1.0 jwt==1.2.0 @@ -27,10 +25,13 @@ pytest-mock~=3.6.1 pytest~=6.2.5 python-dotenv~=0.19.0 python-jenkins==1.7.0 +python-redis-lock~=3.7.0 PyGithub~=1.55 PyYAML~=5.4.1 +pika~=1.2.0 redis~=3.5.3 requests~=2.26.0 rocrate~=0.4.0 SQLAlchemy~=1.3.23 wheel~=0.37.0 + diff --git a/settings.conf b/settings.conf index 56fd419a8..33119552e 100644 --- a/settings.conf +++ b/settings.conf @@ -35,7 +35,7 @@ FLASK_ENV=development LIFEMONITOR_ADMIN_PASSWORD=admin # PostgreSQL DBMS settings -#POSTGRESQL_HOST=0.0.0.0 +POSTGRESQL_HOST=db POSTGRESQL_PORT=5432 POSTGRESQL_DATABASE=lm POSTGRESQL_USERNAME=lm @@ -47,6 +47,7 @@ GUNICORN_THREADS=2 # Dramatiq worker settings WORKER_PROCESSES=1 +WORKER_THREADS=3 # Redis settings REDIS_HOST=redis @@ -56,8 +57,10 @@ REDIS_PORT_NUMBER=6379 # Cache settings CACHE_REDIS_DB=0 CACHE_DEFAULT_TIMEOUT=300 +CACHE_REQUEST_TIMEOUT=15 CACHE_SESSION_TIMEOUT=3600 -CACHE_BUILDS_TIMEOUT=84600 +CACHE_WORKFLOW_TIMEOUT=1800 +CACHE_BUILD_TIMEOUT=84600 # Github OAuth2 settings #GITHUB_CLIENT_ID="___YOUR_GITHUB_OAUTH2_CLIENT_ID___" diff --git a/specs/api.yaml b/specs/api.yaml index 737e18da5..88d712936 100644 --- a/specs/api.yaml +++ b/specs/api.yaml @@ -3,7 +3,7 @@ openapi: "3.0.0" info: - version: "0.3.0" + version: "0.4.0" title: "Life Monitor API" description: | *Workflow sustainability service* @@ -18,7 +18,7 @@ info: servers: - url: / description: > - Version 0.3.0 of API. + Version 0.4.0 of API. tags: - name: Registries @@ -969,7 +969,7 @@ paths: # valueB: "#/components/schemas/ManagedTestInstanceCreationData" responses: "201": - description: Instance created by this operation + $ref: "#/components/responses/TestInstanceRegistered" "400": $ref: "#/components/responses/BadRequest" "401": @@ -1101,47 +1101,6 @@ paths: "404": $ref: "#/components/responses/NotFound" - /instances/{instance_uuid}/builds/{build_id}/logs: - get: - summary: "Get test instance build logs" - description: "Get the build logs for the specified test instance and build" - x-openapi-router-controller: lifemonitor.api.controllers - operationId: "instances_builds_get_logs" - tags: ["Test Instances"] - deprecated: true - security: - - apiKey: ["workflow.read"] - - RegistryClientCredentials: ["workflow.read"] - - RegistryCodeFlow: ["workflow.read"] - - AuthorizationCodeFlow: ["workflow.read"] - - {} - parameters: - - $ref: "#/components/parameters/instance_uuid" - - $ref: "#/components/parameters/build_id" - - $ref: "#/components/parameters/offset_bytes" - - $ref: "#/components/parameters/limit_bytes" - responses: - "200": - description: "Log data" - content: - application/json: - schema: - type: string - description: | - Log messages from the test build - example: | - [15/April/2021:13:55:36 -0700] "GET /workflow.cwl HTTP/1.0" 200 2326 - [15/April/2021:13:55:36 -0700] "GET /workflow.cwl HTTP/1.0" 200 2326 - [15/April/2021:13:55:36 -0700] "GET /workflow.cwl HTTP/1.0" 200 2326 - "400": - $ref: "#/components/responses/BadRequest" - "401": - $ref: "#/components/responses/Unauthorized" - "403": - $ref: "#/components/responses/Forbidden" - "404": - $ref: "#/components/responses/NotFound" - components: parameters: registry_uuid: @@ -1260,22 +1219,6 @@ components: minimum: 1 default: 10 description: "Maximum number of items to retrieve" - limit_bytes: - name: "limit_bytes" - description: "Maximum number of log bytes to retrieve" - in: query - schema: - type: integer - minimum: 1 - default: 131072 # 128 kB - offset_bytes: - name: "offset_bytes" - description: "Number of bytes to skip while fetching the log" - in: query - schema: - type: integer - minimum: 0 - default: 0 responses: NotFound: @@ -1327,6 +1270,20 @@ components: schema: $ref: "#/components/schemas/Workflow" + TestInstanceRegistered: + description: A new test instance has been registered. + content: + application/json: + schema: + type: object + properties: + uuid: + type: string + description: | + Universal unique identifier of the test instance + readOnly: true + example: ba5bbdc3-d9fb-4381-a1d8-96a8ac5594d7 + schemas: User: type: object @@ -1786,6 +1743,10 @@ components: type: array items: $ref: "#/components/schemas/BuildSummary" + reason: + description: "Reason why the status is unavailable" + type: string + nullable: true required: - aggregate_test_status - version @@ -1805,6 +1766,10 @@ components: type: array items: $ref: "#/components/schemas/BuildSummary" + reason: + description: "Reason why the status is unavailable" + type: string + nullable: true required: - version - workflow @@ -1890,6 +1855,10 @@ components: type: array items: $ref: "#/components/schemas/BuildSummary" + reason: + description: Reason why the status is unavailable + type: string + nullable: true required: - suite_uuid - status @@ -2054,6 +2023,7 @@ components: type: string description: Link to the test instance on the testing service example: "https://github.com/crs4/life_monitor/workflows/docs.yaml" + nullable: true required: - service - resource diff --git a/tests/config/data/make-test-rocrates.py b/tests/config/data/make-test-rocrates.py index 86d30e7f3..9953deaaa 100644 --- a/tests/config/data/make-test-rocrates.py +++ b/tests/config/data/make-test-rocrates.py @@ -49,6 +49,9 @@ test_crates.append(('ro-crate-galaxy-sortchangecase', 'ro-crate-galaxy-sortchangecase-invalid-service-type')) test_crates.append(('ro-crate-galaxy-sortchangecase', 'ro-crate-galaxy-sortchangecase-invalid-service-url')) test_crates.append(('ro-crate-galaxy-sortchangecase', 'ro-crate-galaxy-sortchangecase-github-actions')) +test_crates.append(('ro-crate-galaxy-sortchangecase', 'ro-crate-galaxy-sortchangecase-no-name')) + +test_crates.append(('ro-crate-galaxy-sortchangecase', 'ro-crate-galaxy-sortchangecase-rate-limit-exceeded')) # clean up RO-Crates folder if os.path.exists(crates_target_path): @@ -94,6 +97,27 @@ def patch_metadata_graph_node(metadata_file, node, properties): } }) + +patch_metadata_graph_node('crates/ro-crate-galaxy-sortchangecase-rate-limit-exceeded/ro-crate-metadata.json', + node=("@type", "TestInstance"), + properties={ + 'url': 'http://ratelimit:8080/', + 'resource': 'job/test/', + "runsOn": { + "@id": "https://w3id.org/ro/terms/test#RateLimitExceededService" + } + }) + +patch_metadata_graph_node('crates/ro-crate-galaxy-sortchangecase-rate-limit-exceeded/ro-crate-metadata.json', + node=("@type", "TestService"), + properties={ + "@id": "https://w3id.org/ro/terms/test#RateLimitExceededService", + "name": "RateLimit", + "url": { + "@id": "http://ratelimit:8080" + } + }) + patch_metadata_graph_node('crates/ro-crate-galaxy-sortchangecase-travis/ro-crate-metadata.json', node=("name", "sort-and-change-case"), properties={ @@ -168,6 +192,17 @@ def patch_metadata_graph_node(metadata_file, node, properties): "url": {"@id": "https://github.com"} }) +patch_metadata_graph_node('crates/ro-crate-galaxy-sortchangecase-no-name/ro-crate-metadata.json', + node=("@type", "Dataset"), + properties={ + 'name': None + }) +patch_metadata_graph_node('crates/ro-crate-galaxy-sortchangecase-no-name/ro-crate-metadata.json', + node=("@id", "sort-and-change-case.ga"), + properties={ + 'name': None + }) + # create zip archives print("Creating RO-Crate archives:") for c in test_crates: diff --git a/tests/conftest.py b/tests/conftest.py index 6c3e733bf..027a02985 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,16 +25,23 @@ import re import string import uuid +from collections.abc import Iterable from unittest.mock import MagicMock import lifemonitor.db as lm_db import pytest from lifemonitor import auth -from lifemonitor.api.models import TestSuite, User +from lifemonitor.api.models import (TestingService, TestingServiceTokenManager, + TestSuite, User) from lifemonitor.api.services import LifeMonitor +from lifemonitor.cache import cache, clear_cache +from lifemonitor.utils import ClassManager + +from tests.utils import register_workflow from . import conftest_helpers as helpers from .conftest_types import ClientAuthenticationMethod, RegistryType +from .rate_limit_exceeded import RateLimitExceededTestingService # set the module level logger logger = logging.getLogger(__name__) @@ -64,9 +71,47 @@ def headers(): return helpers.get_headers() +@pytest.fixture +def lm() -> LifeMonitor: + return LifeMonitor.get_instance() + + +@pytest.fixture +def service_registry() -> ClassManager: + registry = TestingService.service_type_registry + registry._load_concrete_types() + return registry + + +@pytest.fixture +def token_manager() -> TestingServiceTokenManager: + return TestingServiceTokenManager.get_instance() + + +@pytest.fixture +def no_cache(app_context): + app_context.app.config['CACHE_TYPE'] = "flask_caching.backends.nullcache.NullCache" + assert app_context.app.config.get('CACHE_TYPE') == "flask_caching.backends.nullcache.NullCache" + cache.init_app(app_context.app) + assert cache.cache_enabled is False, "Cache should be disabled" + return cache + + +@pytest.fixture +def redis_cache(app_context): + app_context.app.config['CACHE_TYPE'] = "flask_caching.backends.rediscache.RedisCache" + assert app_context.app.config.get('CACHE_TYPE') == "flask_caching.backends.rediscache.RedisCache" + cache.init_app(app_context.app) + assert cache.cache_enabled is True, "Cache should not be disabled" + cache.clear() + return cache + + @pytest.fixture(autouse=True) -def initialize(app_settings, request_context): +def initialize(app_settings, request_context, service_registry: ClassManager): + service_registry.remove_class("unknown") helpers.clean_db() + clear_cache(client_scope=False) helpers.init_db(app_settings) helpers.disable_auto_login() auth.logout_user() @@ -75,10 +120,12 @@ def initialize(app_settings, request_context): os.environ.pop("FLASK_APP_CONFIG_FILE", None) -def _get_app_settings(include_env=True): +def _get_app_settings(include_env=True, extra=None): settings = env_settings.copy() if include_env else {} settings.update(helpers.load_settings(app_settings_path)) settings.update(helpers.load_settings(tests_settings_path)) + if extra: + settings.update(extra) # remove API KEYS api_keys = {} pattern = re.compile("((\\w+)_API_KEY(_\\w+)?)") @@ -95,7 +142,11 @@ def _get_app_settings(include_env=True): @pytest.fixture(scope="session") def app_settings(request): if hasattr(request, 'param'): - return _get_app_settings(request.param) + logger.debug("App settings param: %r", request.param) + if isinstance(request.param, Iterable): + return _get_app_settings(*request.param) + else: + return _get_app_settings(request.param) return _get_app_settings() @@ -149,11 +200,6 @@ def app_context(app_settings): yield from helpers.app_context(app_settings, init_db=True, clean_db=False, drop_db=False) -@pytest.fixture -def lm() -> LifeMonitor: - return LifeMonitor.get_instance() - - @pytest.fixture() def user1(app_context, provider_type, client_credentials_registry, request): register_workflows = False @@ -248,12 +294,46 @@ def generic_workflow(app_client): 'uuid': str(uuid.uuid4()), 'version': '1', 'roc_link': "http://webserver:5000/download?file=ro-crate-galaxy-sortchangecase.crate.zip", - 'name': 'Galaxy workflow from Generic Link', + 'name': 'sort-and-change-case', 'testing_service_type': 'jenkins', 'authorization': app_client.application.config['WEB_SERVER_AUTH_TOKEN'] } +@pytest.fixture +def workflow_no_name(app_client): + return { + 'uuid': str(uuid.uuid4()), + 'version': '1', + 'roc_link': "http://webserver:5000/download?file=ro-crate-galaxy-sortchangecase-no-name.crate.zip", + 'name': 'Galaxy workflow from Generic Link (no name)', + 'testing_service_type': 'jenkins', + 'authorization': app_client.application.config['WEB_SERVER_AUTH_TOKEN'] + } + + +@pytest.fixture +def rate_limit_exceeded_workflow(app_client, service_registry: ClassManager, user1): + service_registry.add_class("unknown", RateLimitExceededTestingService) + wfdata = { + 'uuid': str(uuid.uuid4()), + 'version': '1', + 'roc_link': "http://webserver:5000/download?file=ro-crate-galaxy-sortchangecase-rate-limit-exceeded.crate.zip", + 'name': 'Galaxy workflow (rate limit exceeded)', + 'testing_service_type': 'unknown', + 'authorization': app_client.application.config['WEB_SERVER_AUTH_TOKEN'] + } + wfdata, workflow_version = register_workflow(user1, wfdata) + logger.info(wfdata) + logger.info(workflow_version) + assert workflow_version, "Workflows not found" + workflow = workflow_version.workflow + workflow.public = True + workflow.save() + assert workflow.public is True, "Workflow should be public" + return workflow + + @pytest.fixture def unmanaged_test_instance(app_client): return { diff --git a/tests/integration/api/controllers/test_instances.py b/tests/integration/api/controllers/test_instances.py index 1037f0c00..34cdaa684 100644 --- a/tests/integration/api/controllers/test_instances.py +++ b/tests/integration/api/controllers/test_instances.py @@ -19,13 +19,13 @@ # SOFTWARE. import json -import pytest import logging +import pytest +from lifemonitor.api import models from tests import utils from tests.conftest_types import ClientAuthenticationMethod - logger = logging.getLogger() @@ -51,7 +51,7 @@ def test_add_unmanaged_instance(app_client, client_auth_method, user1, user1_aut logger.debug(response) utils.assert_status_code(201, response.status_code) response_data = json.loads(response.data) - assert "test_instance_uuid" in response_data, "Unexpcted response: missing 'test_instance_uuid'" + assert "uuid" in response_data, "Unexpcted response: missing 'uuid'" # check number of instances after assert len(suite.test_instances) == num_of_instances + 1, "Unexpected number of instances" @@ -185,6 +185,22 @@ def test_get_instance_builds(app_client, client_auth_method, user1, user1_auth, utils.assert_properties_exist(["build_id", "instance"], item) +def test_get_instance_builds_rate_limit_exceeded(app_client, client_auth_method, user1, user1_auth, rate_limit_exceeded_workflow: models.Workflow): + workflow = rate_limit_exceeded_workflow.latest_version + assert len(workflow.test_suites) > 0, "Unexpected number of test suites" + suite = workflow.test_suites[0] + logger.debug("The test suite: %r", suite) + assert len(suite.test_instances) > 0, "Unexpected number of test instances" + instance = suite.test_instances[0] + logger.debug("The test instance: %r", instance) + response = app_client.get(f"{utils.build_instances_path(instance.uuid)}/latest-builds?limit=2", headers=user1_auth) + logger.debug(response) + utils.assert_status_code(403, response.status_code) + data = json.loads(response.data) + logger.debug("Response data: %r", data) + assert data['title'] == 'Rate Limit Exceeded', "Unexpected error title" + + @pytest.mark.parametrize("client_auth_method", [ # ClientAuthenticationMethod.BASIC, ClientAuthenticationMethod.API_KEY, @@ -240,7 +256,7 @@ def test_get_instance_build(app_client, client_auth_method, user1, user1_auth, v response = app_client.get(f"{utils.build_instances_path(instance.uuid)}/builds/{build.id}", headers=user1_auth) logger.debug(response) - utils.assert_status_code(response.status_code, 200) + utils.assert_status_code(200, response.status_code) data = json.loads(response.data) logger.debug("Response data: %r", data) # redundant check: the validation is performed by the connexion framework @@ -249,27 +265,24 @@ def test_get_instance_build(app_client, client_auth_method, user1, user1_auth, v @pytest.mark.parametrize("client_auth_method", [ # ClientAuthenticationMethod.BASIC, + ClientAuthenticationMethod.NOAUTH, ClientAuthenticationMethod.API_KEY, ClientAuthenticationMethod.AUTHORIZATION_CODE, ClientAuthenticationMethod.CLIENT_CREDENTIALS, ClientAuthenticationMethod.REGISTRY_CODE_FLOW ], indirect=True) -def test_get_instance_build_logs(app_client, client_auth_method, user1, user1_auth, valid_workflow): - w, workflow = utils.pick_and_register_workflow(user1, valid_workflow) +def test_get_instance_build_rate_limit_exceeded(app_client, client_auth_method, user1, user1_auth, rate_limit_exceeded_workflow: models.Workflow): + workflow = rate_limit_exceeded_workflow.latest_version assert len(workflow.test_suites) > 0, "Unexpected number of test suites" suite = workflow.test_suites[0] logger.debug("The test suite: %r", suite) assert len(suite.test_instances) > 0, "Unexpected number of test instances" instance = suite.test_instances[0] logger.debug("The test instance: %r", instance) - assert len(instance.get_test_builds()) > 0, "Unexpected number of test builds" - build = instance.get_test_builds()[0] - response = app_client.get(f"{utils.build_instances_path(instance.uuid)}/builds/{build.id}/logs", - headers=user1_auth) - logger.debug(response.data) - utils.assert_status_code(response.status_code, 200) + response = app_client.get(f"{utils.build_instances_path(instance.uuid)}/builds/0", headers=user1_auth) + logger.debug(response) + utils.assert_status_code(403, response.status_code) data = json.loads(response.data) logger.debug("Response data: %r", data) - # redundant check: the validation is performed by the connexion framework - assert isinstance(data, str), "Unexpected result type" + assert data['title'] == 'Rate Limit Exceeded', "Unexpected error title" diff --git a/tests/integration/api/controllers/test_registries.py b/tests/integration/api/controllers/test_registries.py index e347ab65f..98767f8d9 100644 --- a/tests/integration/api/controllers/test_registries.py +++ b/tests/integration/api/controllers/test_registries.py @@ -45,7 +45,7 @@ def test_get_registries(app_client, client_auth_method, user1, user1_auth): assert len(data['items']) == 1, "Invalid number of registries" -def test_get_registries_no_authorization(app_client, fake_registry): +def test_get_registries_no_authorization(app_client, no_cache, fake_registry): response = app_client.get(utils.build_registries_path()) utils.assert_status_code(200, response.status_code) assert response.data, "Empty response" diff --git a/tests/integration/api/controllers/test_users.py b/tests/integration/api/controllers/test_users.py index 6c312b0f7..ee0357283 100644 --- a/tests/integration/api/controllers/test_users.py +++ b/tests/integration/api/controllers/test_users.py @@ -155,6 +155,64 @@ def test_generic_workflow_registration_wo_uuid(app_client, client_auth_method, assert data['uuid'], "Workflow UUID was not generated or returned" +@pytest.mark.parametrize("client_auth_method", [ + ClientAuthenticationMethod.API_KEY, + ClientAuthenticationMethod.AUTHORIZATION_CODE, +], indirect=True) +def test_generic_workflow_registration_no_name_exception(app_client, client_auth_method, + user1, user1_auth, client_credentials_registry, workflow_no_name): + logger.debug("User: %r", user1) + logger.debug("headers: %r", user1_auth) + workflow = workflow_no_name + logger.debug("Selected workflow: %r", workflow) + logger.debug("Using oauth2 user: %r", user1) + # prepare body + body = {'roc_link': workflow['roc_link'], + 'version': workflow['version'], + 'authorization': workflow['authorization']} + logger.debug("The BODY: %r", body) + response = app_client.post('/users/current/workflows', json=body, headers=user1_auth) + logger.debug("The actual response: %r", response.data) + + utils.assert_status_code(400, response.status_code) + data = json.loads(response.data) + logger.debug("Response data: %r", data) + assert data['title'] == 'Missing attribute \'name\'', "Unexpected error" + + +@pytest.mark.parametrize("client_auth_method", [ + ClientAuthenticationMethod.API_KEY, + ClientAuthenticationMethod.AUTHORIZATION_CODE, +], indirect=True) +def test_generic_workflow_registration_no_name(app_client, client_auth_method, + user1, user1_auth, client_credentials_registry, generic_workflow): + logger.debug("User: %r", user1) + logger.debug("headers: %r", user1_auth) + workflow = generic_workflow + logger.debug("Selected workflow: %r", workflow) + logger.debug("Using oauth2 user: %r", user1) + # prepare body + body = {'roc_link': workflow['roc_link'], + 'version': workflow['version'], + 'authorization': workflow['authorization']} + logger.debug("The BODY: %r", body) + response = app_client.post('/users/current/workflows', json=body, headers=user1_auth) + logger.debug("The actual response: %r", response.data) + utils.assert_status_code(201, response.status_code) + data = json.loads(response.data) + logger.debug("Response data: %r", data) + assert data['wf_version'] == workflow['version'], \ + "Response should be equal to the workflow UUID" + assert data['uuid'], "Workflow UUID was not generated or returned" + + response = app_client.get(f'/workflows/{data["uuid"]}', headers=user1_auth) + logger.debug("The actual response: %r", response.data) + utils.assert_status_code(200, response.status_code) + data = json.loads(response.data) + logger.debug("Response data: %r", data) + assert data['name'] == workflow['name'], "Unexpected workflow name" + + @pytest.mark.parametrize("client_auth_method", [ ClientAuthenticationMethod.API_KEY, ClientAuthenticationMethod.AUTHORIZATION_CODE, diff --git a/tests/integration/api/controllers/test_workflows.py b/tests/integration/api/controllers/test_workflows.py index 8e67fa010..6b31b06d4 100644 --- a/tests/integration/api/controllers/test_workflows.py +++ b/tests/integration/api/controllers/test_workflows.py @@ -23,7 +23,7 @@ import uuid import pytest -from lifemonitor.api.models import WorkflowVersion +from lifemonitor.api.models import WorkflowVersion, Workflow from lifemonitor.auth import current_user from lifemonitor.auth.models import ApiKey from lifemonitor.auth.oauth2.server.models import Token @@ -139,6 +139,30 @@ def test_get_workflows_public(app_client, client_auth_method, user1): assert len(workflows) == 1, "Unexpected number of public workflows" +@pytest.mark.parametrize("client_auth_method", [ + ClientAuthenticationMethod.NOAUTH, +], indirect=True) +@pytest.mark.parametrize("user1", [True], indirect=True) +def test_get_workflows_public_with_rate_limit_exceeded_workflow(app_client, client_auth_method, user1, rate_limit_exceeded_workflow: Workflow): + # get workflows registered by user1 + response = app_client.get(f"{utils.build_workflow_path()}?status=true") + assert response.status_code == 200, "Error getting public workflows" + workflows = json.loads(response.data)['items'] + assert len(workflows) == 2, "Unexpected number of public workflows" + logger.debug("Got workflows: %r", workflows) + for w in workflows: + logger.debug("Checking workflow %r", w) + assert 'status' in w, f"Unable to find the status for the workflow {w['uuid']}" + assert 'aggregate_test_status' in w['status'], f"Unable to find the aggregate_test_status for the workflow {w['uuid']}" + if w['uuid'] == str(rate_limit_exceeded_workflow.uuid): + logger.debug("Checking workflow with rate limit exceeded %r", w['uuid']) + assert w['status']["aggregate_test_status"] == 'not_available', "Unexpected status for workflow with rate limit exceeded" + assert "reason" in w['status'], f"Unable to find the 'reason' property for the workflow {w['uuid']}" + assert "Rate Limit Exceeded" in w['status']['reason'], f"Invalid 'reason' value for the workflow {w['uuid']}" + else: + assert "reason" not in w['status'], f"The 'reason' property should not be set for the workflow {w['uuid']}" + + @pytest.mark.parametrize("client_auth_method", [ # ClientAuthenticationMethod.BASIC, ClientAuthenticationMethod.API_KEY, diff --git a/tests/rate_limit_exceeded.py b/tests/rate_limit_exceeded.py new file mode 100644 index 000000000..458b05c20 --- /dev/null +++ b/tests/rate_limit_exceeded.py @@ -0,0 +1,84 @@ +# Copyright (c) 2020-2021 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +from typing import Any, Dict, List + +from lifemonitor import exceptions as lm_exceptions +from lifemonitor.api import models + + +class RateLimitExceededTestingService(models.TestingService): + + __mapper_args__ = { + 'polymorphic_identity': 'unknown' + } + + @property + def token(self) -> models.TestingServiceToken: + return None + + def initialize(self): + pass + + def check_connection(self) -> bool: + raise lm_exceptions.RateLimitExceededException() + + def is_workflow_healthy(self, test_instance: models.TestInstance) -> bool: + raise lm_exceptions.RateLimitExceededException() + + def get_instance_external_link(self, test_instance: models.TestInstance) -> str: + raise lm_exceptions.RateLimitExceededException() + + def get_last_test_build(self, test_instance: models.TestInstance) -> models.TestBuild: + raise lm_exceptions.RateLimitExceededException() + + def get_last_passed_test_build(self, test_instance: models.TestInstance) -> models.TestBuild: + raise lm_exceptions.RateLimitExceededException() + + def get_last_failed_test_build(self, test_instance: models.TestInstance) -> models.TestBuild: + raise lm_exceptions.RateLimitExceededException() + + def get_test_build(self, test_instance: models.TestInstance, build_number: int) -> models.TestBuild: + raise lm_exceptions.RateLimitExceededException() + + def get_test_build_external_link(self, test_build: models.TestBuild) -> str: + raise lm_exceptions.RateLimitExceededException() + + def get_test_builds(self, test_instance: models.TestInstance, limit: int = 10) -> list: + raise lm_exceptions.RateLimitExceededException() + + def get_test_builds_as_dict(self, test_instance: models.TestInstance, test_output) -> Dict[str, Any]: + raise lm_exceptions.RateLimitExceededException() + + def to_dict(self, test_builds: bool = False, test_output: bool = False) -> dict: + raise lm_exceptions.RateLimitExceededException() + + @classmethod + def all(cls) -> List[models.TestingService]: + raise lm_exceptions.RateLimitExceededException() + + @classmethod + def find_by_uuid(cls, uuid) -> models.TestingService: + raise lm_exceptions.RateLimitExceededException() + + @classmethod + def find_by_url(cls, url) -> models.TestingService: + raise lm_exceptions.RateLimitExceededException() diff --git a/tests/settings.conf b/tests/settings.conf index b5215412f..77c5aa288 100644 --- a/tests/settings.conf +++ b/tests/settings.conf @@ -31,6 +31,18 @@ #TEST_SECRET_KEY="" #PROD_SECRET_KEY="" +REDIS_HOST=redis +REDIS_PASSWORD=foobar +REDIS_PORT_NUMBER=6379 + +# Cache settings +CACHE_REDIS_DB=0 +CACHE_DEFAULT_TIMEOUT=300 +CACHE_REQUEST_TIMEOUT=15 +CACHE_SESSION_TIMEOUT=3600 +CACHE_WORKFLOW_TIMEOUT=1800 +CACHE_BUILD_TIMEOUT=84600 + # PostgreSQL DBMS settings #POSTGRESQL_HOST=0.0.0.0 #POSTGRESQL_PORT=5432 diff --git a/tests/unit/api/controllers/test_instances.py b/tests/unit/api/controllers/test_instances.py index dfec2163d..98798fe4e 100644 --- a/tests/unit/api/controllers/test_instances.py +++ b/tests/unit/api/controllers/test_instances.py @@ -71,7 +71,7 @@ def test_get_instance_by_user_error_forbidden(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_instance_by_user(m, request_context, mock_user): +def test_get_instance_by_user(m, request_context, no_cache, mock_user): assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_registry is not None, "Unexpected registry in session" workflow = MagicMock() @@ -95,7 +95,7 @@ def test_get_instance_by_user(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_instance_build_by_user_error_not_found(m, request_context, mock_user): +def test_get_instance_build_by_user_error_not_found(m, request_context, no_cache, mock_user): assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_registry is not None, "Unexpected registry in session" instance = MagicMock() @@ -136,7 +136,32 @@ def test_get_instance_build_by_user(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_instance_build_last_logs_by_user(m, request_context, mock_user): +def test_get_instance_build_by_user_rate_limit_exceeded(lm, request_context, mock_user, rate_limit_exceeded_workflow: models.Workflow): + assert not auth.current_user.is_anonymous, "Unexpected user in session" + assert auth.current_registry is not None, "Unexpected registry in session" + # set workflow + workflow = rate_limit_exceeded_workflow + lm.get_public_workflows.return_value = [] + lm.get_user_workflows.return_value = [rate_limit_exceeded_workflow] + # set suite + suite: models.TestSuite = workflow.latest_version.test_suites[0] + lm.get_suite.return_value = suite + # set instance + instance: models.TestInstance = suite.test_instances[0] + lm.get_test_instance.return_value = instance + # get and check suite status + response = controllers.instances_builds_get_by_id(instance.uuid, "123") + logger.debug(response.data) + lm.get_test_instance.assert_called_once() + lm.get_suite.assert_called_once() + data = json.loads(response.data) + assert isinstance(data, dict), "Unexpected response type" + assert 403 == int(data["status"]), "Unexpected status code" + assert "Rate Limit Exceeded" == data["title"], "Unexpected error title" + + +@patch("lifemonitor.api.controllers.lm") +def test_get_instance_build_last_logs_by_user(m, request_context, no_cache, mock_user): assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_registry is not None, "Unexpected registry in session" workflow = {"uuid": "1111-222"} @@ -282,7 +307,7 @@ def test_get_instance_by_registry_error_forbidden(m, request_context, mock_regis @patch("lifemonitor.api.controllers.lm") -def test_get_instance_by_registry_error_not_found(m, request_context, mock_registry): +def test_get_instance_by_registry_error_not_found(m, request_context, no_cache, mock_registry): assert auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_registry, "Unexpected registry in session" workflow = {"uuid": "1111-222"} @@ -297,7 +322,7 @@ def test_get_instance_by_registry_error_not_found(m, request_context, mock_regis @patch("lifemonitor.api.controllers.lm") -def test_get_instance_build_by_registry_error_not_found(m, request_context, mock_registry): +def test_get_instance_build_by_registry_error_not_found(m, request_context, no_cache, mock_registry): assert auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_registry, "Unexpected registry in session" build = MagicMock() @@ -331,3 +356,28 @@ def test_get_instance_build_by_registry(m, request_context, mock_registry): response = controllers.instances_builds_get_by_id(instance['uuid'], build.id) m.get_test_instance.assert_called_once() assert isinstance(response, dict), "Unexpected response type" + + +@patch("lifemonitor.api.controllers.lm") +def test_get_instance_build_by_registry_rate_limit_exceeded(lm, request_context, mock_registry, rate_limit_exceeded_workflow: models.Workflow): + assert auth.current_user.is_anonymous, "Unexpected user in session" + assert auth.current_registry, "Unexpected registry in session" + # set workflow + workflow = rate_limit_exceeded_workflow + lm.get_public_workflows.return_value = [] + lm.get_user_workflows.return_value = [rate_limit_exceeded_workflow] + # set suite + suite: models.TestSuite = workflow.latest_version.test_suites[0] + lm.get_suite.return_value = suite + # set instance + instance: models.TestInstance = suite.test_instances[0] + lm.get_test_instance.return_value = instance + # get and check suite status + response = controllers.instances_builds_get_by_id(instance.uuid, "123") + logger.debug(response.data) + lm.get_test_instance.assert_called_once() + lm.get_suite.assert_called_once() + data = json.loads(response.data) + assert isinstance(data, dict), "Unexpected response type" + assert 403 == int(data["status"]), "Unexpected status code" + assert "Rate Limit Exceeded" == data["title"], "Unexpected error title" diff --git a/tests/unit/api/controllers/test_suites.py b/tests/unit/api/controllers/test_suites.py index 9360143f0..08c6bbcf2 100644 --- a/tests/unit/api/controllers/test_suites.py +++ b/tests/unit/api/controllers/test_suites.py @@ -21,6 +21,7 @@ import logging from unittest.mock import MagicMock, patch +import lifemonitor.api.models as models import lifemonitor.api.controllers as controllers import lifemonitor.auth as auth import lifemonitor.exceptions as lm_exceptions @@ -41,7 +42,7 @@ def test_get_suite_error_not_found(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_suite_by_user_without_auth_access_to_workflow(m, request_context, mock_user): +def test_get_suite_by_user_without_auth_access_to_workflow(m, request_context, no_cache, mock_user): # add one user to the current session assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_user == mock_user, "Unexpected user in session" @@ -64,7 +65,7 @@ def test_get_suite_by_user_without_auth_access_to_workflow(m, request_context, m @patch("lifemonitor.api.controllers.lm") -def test_get_suite_by_registry_without_auth_access_to_workflow(m, request_context, mock_registry): +def test_get_suite_by_registry_without_auth_access_to_workflow(m, request_context, no_cache, mock_registry): # add one user to the current session assert auth.current_user.is_anonymous, "Unexpected user in session" logger.debug("Current registry: %r", auth.current_registry) @@ -132,7 +133,7 @@ def test_get_suite_by_registry(m, request_context, mock_registry): @patch("lifemonitor.api.controllers.lm") -def test_get_suite_status_by_user(m, request_context, mock_user): +def test_get_suite_status_by_user(m, request_context, no_cache, mock_user): # add one user to the current session assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_user == mock_user, "Unexpected user in session" @@ -155,7 +156,30 @@ def test_get_suite_status_by_user(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_suite_status_by_registry(m, request_context, mock_registry): +def test_get_suite_status_by_user_rate_limit_exceeded(lm, mock_user, rate_limit_exceeded_workflow: models.Workflow): + # add one user to the current session + assert not auth.current_user.is_anonymous, "Unexpected user in session" + assert auth.current_user == mock_user, "Unexpected user in session" + logger.debug("Current registry: %r", auth.current_registry) + assert not auth.current_registry, "Unexpected registry in session" + # set workflow + workflow = rate_limit_exceeded_workflow + lm.get_public_workflows.return_value = [] + lm.get_user_workflows.return_value = [rate_limit_exceeded_workflow] + # set suite + suite: models.TestSuite = workflow.latest_version.test_suites[0] + lm.get_suite.return_value = suite + # get and check suite status + response = controllers.suites_get_status(suite.uuid) + lm.get_suite.assert_called_once() + logger.info(response) + for p in ["latest_builds", "suite_uuid", "status"]: + assert p in response, f"Property {p} not found on response" + assert response['status'] == 'not_available' + + +@patch("lifemonitor.api.controllers.lm") +def test_get_suite_status_by_registry(m, request_context, no_cache, mock_registry): # add one user to the current session assert auth.current_user.is_anonymous, "Unexpected user in session" logger.debug("Current registry: %r", auth.current_registry) @@ -169,7 +193,7 @@ def test_get_suite_status_by_registry(m, request_context, mock_registry): suite.workflow = workflow m.get_suite.return_value = suite m.get_public_workflow_version.return_value = None - m.get_registry_workflow_version.return_value = suite + m.get_registry_workflow_version.return_value = workflow response = controllers.suites_get_status(suite.suite) m.get_suite.assert_called_once() m.get_registry_workflow_version.assert_called_once() @@ -180,7 +204,29 @@ def test_get_suite_status_by_registry(m, request_context, mock_registry): @patch("lifemonitor.api.controllers.lm") -def test_get_suite_instances_by_user(m, request_context, mock_user): +def test_get_suite_status_by_registry_rate_limit_exceeded(lm, request_context, mock_registry, rate_limit_exceeded_workflow: models.Workflow): + # add one user to the current session + assert auth.current_user.is_anonymous, "Unexpected user in session" + logger.debug("Current registry: %r", auth.current_registry) + assert auth.current_registry, "Unexpected registry in session" + # set workflow + workflow = rate_limit_exceeded_workflow + lm.get_public_workflows.return_value = [] + # set suite + suite: models.TestSuite = workflow.latest_version.test_suites[0] + lm.get_suite.return_value = suite + lm.get_registry_workflow_version = workflow.latest_version + # get and check suite status + response = controllers.suites_get_status(suite.uuid) + lm.get_suite.assert_called_once() + logger.info(response) + for p in ["latest_builds", "suite_uuid", "status"]: + assert p in response, f"Property {p} not found on response" + assert response['status'] == 'not_available' + + +@patch("lifemonitor.api.controllers.lm") +def test_get_suite_instances_by_user(m, request_context, no_cache, mock_user): # add one user to the current session assert not auth.current_user.is_anonymous, "Unexpected user in session" assert auth.current_user == mock_user, "Unexpected user in session" @@ -202,7 +248,7 @@ def test_get_suite_instances_by_user(m, request_context, mock_user): @patch("lifemonitor.api.controllers.lm") -def test_get_suite_instances_by_registry(m, request_context, mock_registry): +def test_get_suite_instances_by_registry(m, request_context, no_cache, mock_registry): # add one user to the current session assert auth.current_user.is_anonymous, "Unexpected user in session" logger.debug("Current registry: %r", auth.current_registry) diff --git a/tests/unit/api/controllers/test_workflows.py b/tests/unit/api/controllers/test_workflows.py index 061bf4950..3576f63e5 100644 --- a/tests/unit/api/controllers/test_workflows.py +++ b/tests/unit/api/controllers/test_workflows.py @@ -18,11 +18,13 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. + import logging from unittest.mock import MagicMock, patch import pytest + import lifemonitor.api.controllers as controllers import lifemonitor.api.models as models import lifemonitor.api.serializers as serializers @@ -46,9 +48,9 @@ def test_get_workflows_with_user(m, request_context, mock_user, fake_uri): # make empty the list of public workflows m.get_public_workflows.return_value = [] # add one fake workflow - data = {"uuid": "123456", "version": "1.0", "uri": fake_uri} + data = {"uuid": "123456", "version": "1.0", "name": "Fake workflow", "uri": fake_uri} w = models.Workflow(uuid=data['uuid']) - w.add_version(data["version"], data['uri'], MagicMock()) + w.add_version(data["version"], data['uri'], MagicMock(), name="Prova") m.get_user_workflows.return_value = [w] response = controllers.workflows_get(status=True) m.get_public_workflows.assert_called_once() @@ -58,6 +60,46 @@ def test_get_workflows_with_user(m, request_context, mock_user, fake_uri): assert response == serializers.ListOfWorkflows(workflow_status=True).dump([w]) +@patch("lifemonitor.api.controllers.lm") +def test_get_public_workflows_rate_limit_exceeded(lm, rate_limit_exceeded_workflow): + # set workflow as public + lm.get_public_workflows.return_value = [rate_limit_exceeded_workflow] + # get workflows + data = controllers.workflows_get(status=True) + logger.info(data) + # check number of items + assert len(data['items']) == 1, "Unexpected number of items" + # inspect item + item = data['items'][0] + assert 'status' in item, "Workflow status should be set" + assert 'aggregate_test_status' in item['status'], "AggregateStatus Workflow status should be set" + assert item['status']['aggregate_test_status'] == 'not_available' + assert "Rate Limit Exceeded" in item['status']['reason'], "Unexpected reason for unavailability" + + +@patch("lifemonitor.api.controllers.lm") +def test_get_workflows_with_user_rate_limit_exceeded(lm, mock_user, no_cache, rate_limit_exceeded_workflow): + # add one user to the current session + assert not auth.current_user.is_anonymous, "Unexpected user in session" + assert auth.current_user == mock_user, "Unexpected user in session" + logger.debug("Current registry: %r", auth.current_registry) + assert not auth.current_registry, "Unexpected registry in session" + # set workflows + lm.get_public_workflows.return_value = [] + lm.get_user_workflows.return_value = [rate_limit_exceeded_workflow] + # get workflows + data = controllers.workflows_get(status=True) + logger.info(data) + # check number of items + assert len(data['items']) == 1, "Unexpected number of items" + # inspect item + item = data['items'][0] + assert 'status' in item, "Workflow status should be set" + assert 'aggregate_test_status' in item['status'], "AggregateStatus Workflow status should be set" + assert item['status']['aggregate_test_status'] == 'not_available' + assert "Rate Limit Exceeded" in item['status']['reason'], "Unexpected reason for unavailability" + + @patch("lifemonitor.api.controllers.lm") def test_get_workflows_with_registry(m, request_context, mock_registry, fake_uri): assert auth.current_user.is_anonymous, "Unexpected user in session" @@ -75,6 +117,30 @@ def test_get_workflows_with_registry(m, request_context, mock_registry, fake_uri assert response == serializers.ListOfWorkflows(workflow_status=True).dump([w]) +@patch("lifemonitor.api.controllers.lm") +def test_get_workflows_with_registry_rate_limit_exceeded(m, request_context, mock_registry, fake_uri, rate_limit_exceeded_workflow): + assert auth.current_user.is_anonymous, "Unexpected user in session" + assert auth.current_registry, "Unexpected registry in session" + # make empty the list of public workflows + m.get_public_workflows.return_value = [] + m.get_registry_workflows.return_value = [rate_limit_exceeded_workflow] + response = controllers.workflows_get(status=True) + m.get_registry_workflows.assert_called_once() + assert isinstance(response, dict), "Unexpected result type" + assert response == serializers.ListOfWorkflows(workflow_status=True).dump([rate_limit_exceeded_workflow]) + # check number of items + assert len(response['items']) == 1, "Unexpected number of items" + # inspect item + item = response['items'][0] + assert 'status' in item, "Workflow status should be set" + assert 'aggregate_test_status' in item['status'], "AggregateStatus Workflow status should be set" + assert item['status']['aggregate_test_status'] == 'not_available' + assert 'status' in item, "Workflow status should be set" + assert 'aggregate_test_status' in item['status'], "AggregateStatus Workflow status should be set" + assert item['status']['aggregate_test_status'] == 'not_available' + assert "Rate Limit Exceeded" in item['status']['reason'], "Unexpected reason for unavailability" + + @patch("lifemonitor.api.controllers.lm") def test_post_workflows_no_authorization(m, request_context): assert auth.current_user.is_anonymous, "Unexpected user in session" diff --git a/tests/unit/cache/test_cache.py b/tests/unit/cache/test_cache.py new file mode 100644 index 000000000..84085e6be --- /dev/null +++ b/tests/unit/cache/test_cache.py @@ -0,0 +1,490 @@ +# Copyright (c) 2020-2021 CRS4 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import logging +import threading +from multiprocessing import Manager, Process +from time import sleep +from unittest.mock import MagicMock + +import lifemonitor.api.models as models +import pytest +from lifemonitor.cache import (IllegalStateException, Timeout, cache, + init_cache, make_cache_key) +from tests import utils +from tests.unit.test_utils import SerializableMock + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize("app_settings", [(False, {"CACHE_TYPE": "Flask_caching.backends.simplecache.SimpleCache"})], indirect=True) +def test_cache_config(app_settings, app_context): + logger.debug("App settings: %r", app_settings) + app = app_context.app + logger.debug("App: %r", app) + config = app.config + logger.debug("Config: %r", config) + assert config.get("CACHE_TYPE") == "Flask_caching.backends.simplecache.SimpleCache", "Unexpected cache type on app config" + init_cache(app) + assert cache.cache_enabled is False, "Cache should be disabled" + with pytest.raises(IllegalStateException): + cache.backend + + +def test_cache_transaction_setup(app_context, redis_cache): + cache.clear() + key = "test" + value = "test" + assert cache.size() == 0, "Cache should be empty" + with cache.transaction("test") as t: + assert t.size() == 0, "Unexpected transaction size: it should be empty" + t.set(key, value) + assert t.size() == 1, "Unexpected transaction size: it should be equal to 1" + assert t.has(key), f"key '{key}' should be set in the current transaction" + assert cache.size() == 0, "Cache should be empty" + + assert cache.size() == 1, "Cache should contain one element" + assert cache.has(key), f"key '{key}' should be in cache" + assert cache.get_current_transaction() is None, "Unexpected transaction" + + +def test_cache_timeout(app_context, redis_cache): + cache.clear() + assert cache.size() == 0, "Cache should be empty" + key = "test5" + value = 1024 + timeout = 5 + cache.set(key, value, timeout=timeout) + assert cache.size() == 1, "Cache should not be empty" + assert cache.has(key) is True, f"Key {key} should be in cache" + sleep(5) + assert cache.size() == 0, "Cache should be empty" + assert cache.has(key) is False, f"Key {key} should not be in cache after {timeout} secs" + + +def test_cache_last_build(app_context, redis_cache, user1): + valid_workflow = 'sort-and-change-case' + cache.clear() + assert cache.size() == 0, "Cache should be empty" + _, workflow = utils.pick_and_register_workflow(user1, valid_workflow) + assert workflow, "Workflow should be set" + assert len(workflow.test_suites) > 0, "The workflow should have at least one suite" + suite: models.TestSuite = workflow.test_suites[0] + assert len(suite.test_instances) > 0, "The suite should have at least one test instance" + instance: models.TestInstance = suite.test_instances[0] + + last_build_key = make_cache_key(instance.get_last_test_build, client_scope=False, args=(instance,)) + assert instance.cache.get(last_build_key) is None, "Cache should be empty" + build = instance.last_test_build + assert build, "Last build should not be empty" + cached_build = instance.cache.get(last_build_key) + assert cached_build is not None, "Cache should not be empty" + assert build == cached_build, "Build should be equal to the cached build" + + instance.get_test_builds = MagicMock(return_value=None) + + build = instance.last_test_build + assert build, "Last build should not be empty" + assert instance.get_test_builds.assert_not_called, "instance.get_test_builds should not be used" + assert build == cached_build, "Build should be equal to the cached build" + + +def test_cache_test_builds(app_context, redis_cache, user1): + valid_workflow = 'sort-and-change-case' + cache.clear() + assert cache.size() == 0, "Cache should be empty" + _, workflow = utils.pick_and_register_workflow(user1, valid_workflow) + assert workflow, "Workflow should be set" + assert len(workflow.test_suites) > 0, "The workflow should have at least one suite" + suite: models.TestSuite = workflow.test_suites[0] + assert len(suite.test_instances) > 0, "The suite should have at least one test instance" + instance: models.TestInstance = suite.test_instances[0] + + limit = 10 + cache_key = make_cache_key(instance.get_test_builds, client_scope=False, args=(instance,), kwargs={"limit": limit}) + assert instance.cache.get(cache_key) is None, "Cache should be empty" + builds = instance.get_test_builds(limit=limit) + assert builds and len(builds) > 0, "Invalid number of builds" + + cached_builds = instance.cache.get(cache_key) + assert cached_builds is not None and len(cached_builds) > 0, "Cache should not be empty" + assert len(builds) == len(cached_builds), "Unexpected number of cached builds" + + instance.testing_service.get_test_builds = MagicMock(return_value=None) + builds = instance.get_test_builds(limit=limit) + assert builds and len(builds) > 0, "Invalid number of builds" + assert instance.testing_service.get_test_builds.assert_not_called, "instance.get_test_builds should not be used" + assert len(builds) == len(cached_builds), "Unexpected number of cached builds" + + limit = 20 + cache_key = make_cache_key(instance.get_test_builds, client_scope=False, args=(instance,), kwargs={"limit": limit}) + assert instance.cache.get(cache_key) is None, "Cache should be empty" + + +def setup_test_cache_last_build_update(app_context, redis_cache, user1): + valid_workflow = 'sort-and-change-case' + logger.debug("Cache content: %r", cache.keys) + cache.clear() + assert cache.size() == 0, "Cache should be empty" + _, w = utils.pick_and_register_workflow(user1, valid_workflow) + assert w, "Workflow should be set" + return w + + +def test_cache_last_build_update(app_context, redis_cache, user1): + w = setup_test_cache_last_build_update(app_context, redis_cache, user1) + cache.reset_locks() + sleep(2) + cache_last_build_update(app_context.app, w, user1, check_cache_size=True) + + +def cache_last_build_update(app, w, user1, check_cache_size=True, index=0, + multithreaded=False, results=None): + transactions = [] + logger.debug("Params of thread %r", index) + logger.debug("%r %r %r %r", check_cache_size, index, multithreaded, results) + if not multithreaded: + assert len(cache.backend.keys("lock*")) == 0, "No lock should be set" + with app.app_context(): + transaction_keys = None + with cache.transaction(f"T{index}") as t: + logger.debug("Current transaction: %r", t) + logger.debug("Current workflow: %r", w) + transactions.append(t) + + assert cache.get_current_transaction() == t, "Unexpected transaction" + for s in w.test_suites: + logger.info("[t#%r] Updating workflow (): %r", index, w) + for i in s.test_instances: + get_test_builds_method = i.testing_service.get_test_builds + builds_data = i.testing_service.get_test_builds(i) + i.testing_service.get_test_builds = SerializableMock() + i.testing_service.get_test_builds.return_value = builds_data + + assert cache.get_current_transaction() == t, "Unexpected transaction" + assert i.cache.get_current_transaction() == t, "Unexpected transaction" + + cache_key = make_cache_key(i.get_test_builds, client_scope=False, args=[i]) + logger.debug("The cache key: %r", cache_key) + + ############################################################################# + # latest builds (first call) + ############################################################################# + logger.debug("\n\nGetting latest builds (first call)...") + builds = i.get_test_builds() + logger.debug("Getting latest builds (first call): %r\n", builds) + assert t.has(cache_key), "The key should be in the current transaction" + cache_size = cache.size() + logger.debug("Current cache size: %r", cache_size) + assert i.cache.get_current_transaction() == t, "Unexpected transaction" + # check cache + if not multithreaded: + i.testing_service.get_test_builds.assert_called_once(), "i.testing_service.get_test_builds should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + + ############################################################################# + # latest builds (second call) + ############################################################################# + logger.debug("\n\nGetting latest builds (second call)...") + builds = i.get_test_builds() + logger.debug("Getting latest builds (second call): %r\n", builds) + assert i.cache.get_current_transaction() == t, "Unexpected transaction" + assert t.has(cache_key), "The key should be in the current transaction" + if not multithreaded: + i.testing_service.get_test_builds.assert_called_once(), "i.testing_service.get_test_builds should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + if check_cache_size: + assert cache.size() == cache_size, "Unexpected cache size" + + ############################################################################# + # latest builds (third call) + ############################################################################# + logger.debug("\n\nGetting latest builds (third call)...") + builds = i.get_test_builds() + logger.debug("Getting latest builds (third call): %r\n", builds) + assert i.cache.get_current_transaction() == t, "Unexpected transaction" + assert t.has(cache_key), "The key should be in the current transaction" + if not multithreaded: + i.testing_service.get_test_builds.assert_called_once(), "i.testing_service.get_test_builds should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + if check_cache_size: + assert cache.size() == cache_size, "Unexpected cache size" + + ############################################################################# + # Check builds + ############################################################################# + logger.debug("\n\nPreparing data to test builds...") + b_data = [] + for b in builds: + b_data.append(i.testing_service.get_test_build(i, b.id)) + logger.debug("\n\nPreparing data to test builds... DONE") + + assert len(b_data) == 4, "Unexpected number of builds" + + logger.debug("\n\nChecking test builds...") + get_test_build_method = i.testing_service.get_test_build + + for count in range(0, len(b_data)): + b = b_data[count] + i.testing_service.get_test_build = SerializableMock() + i.testing_service.get_test_build.return_value = b + + cache_key = make_cache_key(i.get_test_build, client_scope=False, args=[i, b.id]) + + # first call ############################################################# + logger.debug("\n\nChecking build (first call): buildID=%r", b.id) + logger.debug("Build data: %r", i.get_test_build(b.id)) + assert t.has(cache_key), "The key should be in the current transaction" + if not multithreaded: + i.testing_service.get_test_build.call_count == count + 1, "i.testing_service.get_test_build should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + cache_size = cache.size() + logger.debug("Current cache size: %r", cache_size) + + # second call ############################################################# + logger.debug("\n\nChecking build (second call): buildID=%r", b.id) + logger.debug("Build data: %r", i.get_test_build(b.id)) + assert t.has(cache_key), "The key should be in the current transaction" + if not multithreaded: + i.testing_service.get_test_build.call_count == count + 1, "i.testing_service.get_test_build should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + if check_cache_size: + assert cache.size() == cache_size, "Unexpected cache size" + # third call ############################################################# + logger.debug("\n\nChecking build (third call): buildID=%r", b.id) + logger.debug("Build data: %r", i.get_test_build(b.id)) + assert t.has(cache_key), "The key should be in the current transaction" + if not multithreaded: + i.testing_service.get_test_build.call_count == count + 1, "i.testing_service.get_test_build should be called once" + logger.debug(f"Checking if key {cache_key} is in cache...") + assert not cache.has(cache_key), "The key should not be in cache" + if check_cache_size: + assert cache.size() == cache_size, "Unexpected cache size" + + # check last test build + logger.debug("\n\nGetting latest build: %r", i.last_test_build) + if not multithreaded: + i.testing_service.get_test_builds.assert_called_once(), "i.testing_service.get_test_builds should be called once" + logger.debug("\n\nGetting latest build... DONE\n\n") + + # restore original method + i.testing_service.get_test_build = get_test_build_method + i.testing_service.get_test_builds = get_test_builds_method + + ############################################################################ + # check latest build + ############################################################################ + logger.debug("\n\nGetting latest build: %r", i.last_test_build) + if check_cache_size: + assert cache.size() == cache_size, "Unexpected cache size" + + # check transactions + transaction_keys = t.keys() + logger.debug("Transaction keys (# %r): %r", len(transaction_keys), transaction_keys) + assert len(transaction_keys) == t.size(), "Unexpected transaction size" + + # check the cache after the transaction is completed + if check_cache_size: + cache_size = cache.size() + assert len(transaction_keys) == cache_size, "Unpexpected cache size: it should be equal to the transaction size" + sleep(2) + assert cache.size() > 0, "Cache should not be empty" + logger.debug(cache.keys()) + + # prepare return value + return_value = [] + for tr in transactions: + return_value.append({ + 'transaction': str(tr), + 'keys': tr.keys() + }) + if not multithreaded: + assert len(cache.backend.keys("lock*")) == 0, "No lock should be set" + else: + if results: + assert results, "Results should not be none" + results[index]['result'].extend(return_value) + logger.debug("Return value: %r", return_value) + return return_value + + +def test_cache_task_last_build(app_context, redis_cache, user1): + valid_workflow = 'sort-and-change-case' + logger.debug("Cache content: %r", cache.keys) + cache.clear() + assert cache.size() == 0, "Cache should be empty" + _, workflow = utils.pick_and_register_workflow(user1, valid_workflow) + assert workflow, "Workflow should be set" + + from lifemonitor.tasks.tasks import check_last_build + check_last_build() + + sleep(2) + assert cache.size() > 0, "Cache should not be empty" + logger.debug(cache.keys()) + assert len(cache.backend.keys("lock*")) == 0, "No lock should be set" + + +def check_results(results): + logger.debug("\n\n\nResults: %r", results) + assert len(cache.backend.keys(pattern="locks*")) == 0, "Locks should not be in cache" + for i in range(0, len(results)): + if i == len(results) - 1: + break + p1 = results[i] + p2 = results[i + 1] + r1 = p1['result'] + r2 = p2['result'] + processes = f"'{p1['index']}' and '{p2['index']}'" + logger.debug(f"Checking process/thread {processes}") + logger.debug("Number of transactions: %r => %r ||| %r => %r", p1['index'], len(r1), p2['index'], len(r2)) + assert len(r1) == len(r2), f"Process/thread {processes} should have the same number of transactions" + for tdx in range(0, len(r1)): + logger.debug("Checking transactions %r and %r", + r1[tdx]['transaction'], r2[tdx]['transaction']) + assert len(r1[tdx]['keys']) == len(r2[tdx]['keys']), \ + f"Transactions {r1[tdx]['transaction']} and {r2[tdx]['transaction']} should have the same number of keys" + logger.debug("\n\nChecking Results DONE\n\n\n",) + + +def test_cache_last_build_update_multi_thread(app_context, redis_cache, user1): + # set up a workflow + w = setup_test_cache_last_build_update(app_context, redis_cache, user1) + logger.debug("Workflow %r", w) + # set up threads + results = [] + number_of_threads = 3 + for index in range(number_of_threads): + t = threading.Thread( + target=cache_last_build_update, name=f"T{index}", args=(app_context.app, w, user1), + kwargs={ + "check_cache_size": False, + "index": index, + "multithreaded": True, + "results": results}) + results.append({ + 't': t, + 'index': str(index), + "result": [] + }) + t.start() + sleep(2) + + # wait for results + for tdata in results: + t = tdata['t'] + t.join() + # check results + sleep(2) + check_results(results) + + +def test_cache_last_build_update_multi_process(app_context, redis_cache, user1): + # set up a workflow + w = setup_test_cache_last_build_update(app_context, redis_cache, user1) + # set up processes + processes = 3 + results = [] + manager = Manager() + for index in range(processes): + p = Process(target=cache_last_build_update, args=(app_context.app, w, user1), + kwargs={"check_cache_size": False, + "index": index, + "multithreaded": True, + "results": results}) + results.append({ + 'p': p, + 'index': str(index), + 'result': manager.list() + }) + p.start() + sleep(1) + # wait for results + for pdata in results: + p = pdata['p'] + p.join() + # check results + sleep(4) + check_results(results) + + +def cache_transaction(transaction, index, name, results): + sleep(5) + logger.debug(f"Cache transaction: {name}") + with cache.transaction(f"T-{index}") as t: + current_transaction = cache.get_current_transaction() + logger.debug("Current transaction: %r", current_transaction) + assert current_transaction != transaction, "Unexpected transaction: transaction should be defferent from that on the main process" + assert t == current_transaction, "Unexpected transaction" + + with cache.transaction() as tx: + assert tx == t, "Unexpected transaction: it should be the same started in this thread" + + key = "TEST" + result = transaction.get(key) + if result is None: + logger.debug(f"Value {key} not set in cache...") + with tx.lock(key, blocking=True, timeout=Timeout.NONE): + result = transaction.get(key) + if not result: + logger.debug("Cache empty: getting value from the actual function...") + sleep(5) + result = f"result-of-index: {index}" + if index != -1: + result = cache_transaction(transaction, -1, f"{index}-NONE", results) + unless = None + logger.debug("Checking unless function: %r", unless) + if unless is None or unless is False or callable(unless) and not unless(result): + transaction.set(key, result, timeout=Timeout.NONE) + else: + logger.debug("Don't set value in cache due to unless=%r", "None" if unless is None else "True") + + +def test_cache_transaction_multi_thread(app_context, redis_cache, user1): + # set up threads + logger.debug("Test cache transaction...") + number_of_threads = 4 + results = [] + with cache.transaction() as transaction: + print("The transaction: %r" % transaction) + + for index in range(number_of_threads): + t = threading.Thread( + target=cache_transaction, name=f"T{index}", args=(transaction, index, f"{index}", results), + kwargs={}) + results.append({ + "t": t, + "result": [] + }) + t.start() + # wait for results + for tdata in results: + t = tdata['t'] + t.join() + # check results + sleep(2) + # check_results(results) + + logger.debug("Test cache transaction... DONE") diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 695f050d2..ad2b27349 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -20,11 +20,11 @@ import os import tempfile - -import pytest +from unittest.mock import MagicMock, Mock import lifemonitor.exceptions as lm_exceptions import lifemonitor.utils as utils +import pytest def test_download_url_404(): @@ -32,3 +32,8 @@ def test_download_url_404(): with pytest.raises(lm_exceptions.DownloadException) as excinfo: _ = utils.download_url('http://httpbin.org/status/404', os.path.join(d, 'get_404')) assert excinfo.value.status == 404 + + +class SerializableMock(MagicMock): + def __reduce__(self): + return (Mock, ())