diff --git a/.woodpecker/build-linux.yml b/.woodpecker/build-linux.yml index 8cdd984..ef4ecfd 100644 --- a/.woodpecker/build-linux.yml +++ b/.woodpecker/build-linux.yml @@ -3,28 +3,33 @@ matrix: - linux/amd64 - linux/arm64 -platform: ${platform} - labels: type: exec + platform: ${platform} -pipeline: +steps: - name: publish-linux image: bash commands: - - export VERSION=$([ -z $CI_COMMIT_TAG ] && echo latest || echo $CI_COMMIT_TAG)-$(sed 's#/#-#g' <<< $CI_SYSTEM_ARCH) + - export VERSION=$([ -z $CI_COMMIT_TAG ] && echo latest || echo $CI_COMMIT_TAG)-$(sed 's#/#-#g' <<< $CI_SYSTEM_PLATFORM) - echo $CONTAINER_PASSWORD | podman login $DRYCC_REGISTRY --username $CONTAINER_USERNAME --password-stdin > /dev/null 2>&1 - make podman-build podman-immutable-push - secrets: - - codename - - dev_registry - - drycc_registry - - container_username - - container_password + environment: + CODENAME: + from_secret: codename + DEV_REGISTRY: + from_secret: dev_registry + DRYCC_REGISTRY: + from_secret: drycc_registry + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - push - tag + - cron depends_on: - test-linux \ No newline at end of file diff --git a/.woodpecker/chart.yaml b/.woodpecker/chart.yaml index 3204cf5..d40739c 100644 --- a/.woodpecker/chart.yaml +++ b/.woodpecker/chart.yaml @@ -1,30 +1,34 @@ -platform: linux/amd64 - labels: type: exec + platform: linux/amd64 -pipeline: +steps: - name: generate-chart - type: local image: bash commands: - export VERSION=$(sed 's#v##' <<< $CI_COMMIT_TAG) - export IMAGE_TAG=$([ ! -z $CI_COMMIT_TAG ] && echo \"$VERSION\" || echo \"canary\") - - export APP_VERSION=$([ ! -z $CI_COMMIT_TAG ] && echo $VERSION || echo 1.0.0) + - export APP_VERSION=$([ -z $CI_COMMIT_TAG ] && echo $CI_COMMIT_SHA || echo $VERSION) - export CHART_VERSION=$([ -z $CI_COMMIT_TAG ] && echo 1.0.0 || echo $VERSION) - sed -i "s/imageTag:\ \"canary\"/imageTag:\ $IMAGE_TAG/g" charts/$${CI_REPO_NAME}/values.yaml + - sed -i s#{{repository}}#oci://$DRYCC_REGISTRY/$([ -z $CI_COMMIT_TAG ] && echo charts-testing || echo charts)#g charts/$${CI_REPO_NAME}/Chart.yaml - helm package -u charts/$${CI_REPO_NAME} --version $CHART_VERSION --app-version $APP_VERSION - echo $CONTAINER_PASSWORD | helm registry login $DRYCC_REGISTRY -u $CONTAINER_USERNAME --password-stdin - helm push $${CI_REPO_NAME}-$CHART_VERSION.tgz oci://$DRYCC_REGISTRY/$([ -z $CI_COMMIT_TAG ] && echo charts-testing || echo charts) - secrets: - - dev_registry - - drycc_registry - - container_username - - container_password + environment: + DEV_REGISTRY: + from_secret: dev_registry + DRYCC_REGISTRY: + from_secret: drycc_registry + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - push - tag + - cron depends_on: - manifest \ No newline at end of file diff --git a/.woodpecker/manifest.yml b/.woodpecker/manifest.yml index 2d5c4e5..7330ea0 100644 --- a/.woodpecker/manifest.yml +++ b/.woodpecker/manifest.yml @@ -1,20 +1,21 @@ -platform: linux/amd64 - labels: type: exec + platform: linux/amd64 -pipeline: +steps: - name: generate-manifest image: bash commands: - sed -i "s/{{project}}/$${CI_REPO_NAME}/g" .woodpecker/manifest.tmpl - sed -i "s/registry.drycc.cc/$${DRYCC_REGISTRY}/g" .woodpecker/manifest.tmpl - secrets: - - drycc_registry + environment: + DRYCC_REGISTRY: + from_secret: drycc_registry when: event: - tag - push + - cron - name: publish-manifest image: bash @@ -27,13 +28,16 @@ pipeline: -v $(pwd):$(pwd) -w $(pwd) docker.io/plugins/manifest - secrets: - - container_username - - container_password + environment: + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - tag - push + - cron depends_on: - build-linux diff --git a/.woodpecker/test-linux.yml b/.woodpecker/test-linux.yml index 313179e..ab8ee69 100644 --- a/.woodpecker/test-linux.yml +++ b/.woodpecker/test-linux.yml @@ -3,20 +3,24 @@ matrix: - linux/amd64 - linux/arm64 -platform: ${platform} - labels: type: exec + platform: ${platform} -pipeline: +steps: - name: test-linux image: bash commands: - - make test - secrets: - - codename - - dev_registry + - make test upload-coverage + environment: + CODENAME: + from_secret: codename + DEV_REGISTRY: + from_secret: dev_registry + CODECOV_TOKEN: + from_secret: codecov_token when: event: - push - tag + - cron diff --git a/Makefile b/Makefile index 26df599..fed80c8 100644 --- a/Makefile +++ b/Makefile @@ -61,6 +61,6 @@ test-integration: upload-coverage: $(eval CI_ENV := $(shell curl -s https://codecov.io/env | bash)) - podman run --rm ${CI_ENV} -v ${CURDIR}:/tmp/test -w /tmp/test/rootfs ${IMAGE}.test /tmp/test/rootfs/bin/upload-coverage + podman run --rm ${CI_ENV} -v ${CURDIR}:/tmp/test -w /tmp/test/rootfs -e CODECOV_TOKEN=${CODECOV_TOKEN} ${IMAGE}.test /tmp/test/rootfs/bin/upload-coverage .PHONY: check-kubectl check-podman build podman-build podman-build-test deploy clean commit-hook full-clean test test-style test-unit test-functional test-integration upload-coverage diff --git a/charts/helmbroker/Chart.yaml b/charts/helmbroker/Chart.yaml index 4ef1dc8..6e8d2aa 100644 --- a/charts/helmbroker/Chart.yaml +++ b/charts/helmbroker/Chart.yaml @@ -6,9 +6,10 @@ dependencies: - name: common repository: oci://registry.drycc.cc/charts version: ~1.1.2 - - name: rabbitmq - repository: oci://registry.drycc.cc/charts-testing + - name: valkey + repository: {{repository}} version: x.x.x + condition: valkey.enabled description: Drycc Workflow helmbroker. maintainers: - name: Drycc Team diff --git a/charts/helmbroker/templates/_helpers.tpl b/charts/helmbroker/templates/_helpers.tpl index 1fa77d2..3e9e40f 100644 --- a/charts/helmbroker/templates/_helpers.tpl +++ b/charts/helmbroker/templates/_helpers.tpl @@ -3,26 +3,24 @@ env: - name: "TZ" value: {{ .Values.time_zone | default "UTC" | quote }} -- name: USERNAME +- name: HELMBROKER_USERNAME value: {{ if .Values.username | default "" | ne "" }}{{ .Values.username }}{{ else }}{{ randAlphaNum 32 }}{{ end }} -- name: PASSWORD +- name: HELMBROKER_PASSWORD value: {{ if .Values.password | default "" | ne "" }}{{ .Values.password }}{{ else }}{{ randAlphaNum 32 }}{{ end }} -{{- if (.Values.rabbitmqUrl) }} -- name: DRYCC_RABBITMQ_URL - value: {{ .Values.rabbitmqUrl }} -{{- else if eq .Values.global.rabbitmqLocation "on-cluster" }} -- name: "DRYCC_RABBITMQ_USERNAME" +{{- if (.Values.valkeyUrl) }} +- name: HELMBROKER_VALKEY_URL valueFrom: secretKeyRef: - name: rabbitmq-creds - key: username -- name: "DRYCC_RABBITMQ_PASSWORD" + name: helmbroker-creds + key: valkey-url +{{- else if .Values.valkey.enabled }} +- name: VALKEY_PASSWORD valueFrom: secretKeyRef: - name: rabbitmq-creds + name: valkey-creds key: password -- name: "DRYCC_RABBITMQ_URL" - value: "amqp://$(DRYCC_RABBITMQ_USERNAME):$(DRYCC_RABBITMQ_PASSWORD)@drycc-rabbitmq.{{$.Release.Namespace}}.svc.{{$.Values.global.clusterDomain}}:5672/drycc" +- name: HELMBROKER_VALKEY_URL + value: "redis://:$(VALKEY_PASSWORD)@drycc-valkey:26379/0?master_set=drycc" {{- end }} {{- range $key, $value := .Values.environment }} - name: {{ $key }} @@ -30,21 +28,6 @@ env: {{- end }} {{- end }} - -{{/* Generate helmbroker deployment limits */}} -{{- define "helmbroker.limits" -}} -{{- if or (.Values.limits_cpu) (.Values.limits_memory) }} -resources: - limits: -{{- if (.Values.limits_cpu) }} - cpu: {{.Values.limits_cpu}} -{{- end }} -{{- if (.Values.limits_memory) }} - memory: {{.Values.limits_memory}} -{{- end }} -{{- end }} -{{- end }} - {{/* Generate helmbroker deployment volumeMounts */}} {{- define "helmbroker.volumeMounts" }} volumeMounts: diff --git a/charts/helmbroker/templates/helmbroker-celery-deployment.yaml b/charts/helmbroker/templates/helmbroker-celery-deployment.yaml index d29b519..1b39b78 100644 --- a/charts/helmbroker/templates/helmbroker-celery-deployment.yaml +++ b/charts/helmbroker/templates/helmbroker-celery-deployment.yaml @@ -7,7 +7,7 @@ metadata: annotations: component.drycc.cc/version: {{ .Values.imageTag }} spec: - replicas: {{ .Values.celeryReplicas }} + replicas: {{ .Values.celery.replicas }} strategy: rollingUpdate: maxSurge: 1 @@ -33,18 +33,32 @@ spec: args: - netcat - -v - - -a - - $(DRYCC_HELMBROKER_SERVICE_HOST):$(DRYCC_HELMBROKER_SERVICE_PORT) + - -u + - http://drycc-helmbroker {{- include "helmbroker.envs" . | indent 10 }} containers: - - name: drycc-helmbroker-celery - image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} - imagePullPolicy: {{.Values.imagePullPolicy}} - args: - - /bin/bash - - -c - - celery -A helmbroker worker --autoscale=32,1 --loglevel=info - {{- include "helmbroker.limits" . | indent 10 }} - {{- include "helmbroker.envs" . | indent 10 }} - {{- include "helmbroker.volumeMounts" . | indent 10 }} + {{- range $key := (list "low" "middle" "high") }} + - name: drycc-helmbroker-celery-{{$key}} + image: {{$.Values.imageRegistry}}/{{$.Values.imageOrg}}/helmbroker:{{$.Values.imageTag}} + imagePullPolicy: {{$.Values.imagePullPolicy}} + {{- if $.Values.diagnosticMode.enabled }} + command: {{- include "common.tplvalues.render" (dict "value" $.Values.diagnosticMode.command "context" $) | nindent 8 }} + args: {{- include "common.tplvalues.render" (dict "value" $.Values.diagnosticMode.args "context" $) | nindent 8 }} + {{- else }} + args: + - /bin/bash + - -c + - celery --app helmbroker worker -n {{uuidv4}}@%h --queues helmbroker.{{$key}} --autoscale=32,1 --loglevel=WARNING + {{- end }} + {{- with index $.Values "celery" "resources" }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} + {{- include "helmbroker.envs" $ | indent 8 }} + {{- include "helmbroker.volumeMounts" $ | indent 8 }} + {{- end }} {{- include "helmbroker.volumes" . | indent 6 }} + securityContext: + fsGroup: 1001 + runAsGroup: 1001 + runAsUser: 1001 diff --git a/charts/helmbroker/templates/helmbroker-cm.yaml b/charts/helmbroker/templates/helmbroker-cm.yaml index 05f6e0d..d13e2ee 100644 --- a/charts/helmbroker/templates/helmbroker-cm.yaml +++ b/charts/helmbroker/templates/helmbroker-cm.yaml @@ -13,6 +13,8 @@ data: - name: {{ .name }} url: {{ .url }} {{- end }} + {{- if .Values.addonValues }} addon-values: | {{- (tpl .Values.addonValues $) | nindent 4 }} + {{- end }} {{- end }} diff --git a/charts/helmbroker/templates/helmbroker-cronjob-daily.yaml b/charts/helmbroker/templates/helmbroker-cronjob-daily.yaml index c608862..825e3c8 100644 --- a/charts/helmbroker/templates/helmbroker-cronjob-daily.yaml +++ b/charts/helmbroker/templates/helmbroker-cronjob-daily.yaml @@ -21,10 +21,16 @@ spec: - image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} imagePullPolicy: {{.Values.imagePullPolicy}} name: drycc-helmbroker-cleaner + {{- if .Values.diagnosticMode.enabled }} + command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 14 }} + args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 14 }} + {{- else }} args: - /bin/bash - -c - python -m helmbroker.cleaner + python -m helmbroker.database.fetch + {{- end }} {{- include "helmbroker.envs" . | indent 12 }} {{- include "helmbroker.volumeMounts" . | indent 12 }} {{- include "helmbroker.volumes" . | indent 10 }} \ No newline at end of file diff --git a/charts/helmbroker/templates/helmbroker-deployment.yaml b/charts/helmbroker/templates/helmbroker-deployment.yaml index 8de5661..45cc4cb 100644 --- a/charts/helmbroker/templates/helmbroker-deployment.yaml +++ b/charts/helmbroker/templates/helmbroker-deployment.yaml @@ -7,7 +7,7 @@ metadata: annotations: component.drycc.cc/version: {{ .Values.imageTag }} spec: - replicas: {{ .Values.replicas }} + replicas: {{ .Values.api.replicas }} strategy: rollingUpdate: maxSurge: 1 @@ -27,15 +27,6 @@ spec: nodeAffinity: {{- include "common.affinities.nodes" (dict "type" .Values.api.nodeAffinityPreset.type "key" .Values.api.nodeAffinityPreset.key "values" .Values.api.nodeAffinityPreset.values ) | nindent 10 }} serviceAccount: drycc-helmbroker initContainers: - - name: loader - image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} - imagePullPolicy: {{.Values.imagePullPolicy}} - args: - - /bin/bash - - -c - - python -m helmbroker.loader - {{- include "helmbroker.envs" . | indent 10 }} - {{- include "helmbroker.volumeMounts" . | indent 10 }} - name: drycc-helmbroker-init image: registry.drycc.cc/drycc/python-dev:latest imagePullPolicy: {{.Values.imagePullPolicy}} @@ -43,29 +34,51 @@ spec: - netcat - -v - -u - - $(DRYCC_RABBITMQ_URL) + - $(HELMBROKER_VALKEY_URL) {{- include "helmbroker.envs" . | indent 10 }} - containers: - - name: drycc-helmbroker + - name: drycc-helmbroker-fetch image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} imagePullPolicy: {{.Values.imagePullPolicy}} - livenessProbe: - httpGet: - path: /healthz - port: 8000 - initialDelaySeconds: 30 - timeoutSeconds: 10 - readinessProbe: - httpGet: - path: /readiness - port: 8000 - initialDelaySeconds: 30 - timeoutSeconds: 10 - periodSeconds: 5 - ports: - - containerPort: 8000 - name: http - {{- include "helmbroker.limits" . | indent 10 }} + args: + - /bin/bash + - -c + - python -m helmbroker.database.fetch {{- include "helmbroker.envs" . | indent 10 }} {{- include "helmbroker.volumeMounts" . | indent 10 }} + containers: + - name: drycc-helmbroker + image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} + imagePullPolicy: {{.Values.imagePullPolicy}} + {{- if .Values.diagnosticMode.enabled }} + command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 8 }} + args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 8 }} + {{- end }} + {{- if not .Values.diagnosticMode.enabled }} + livenessProbe: + httpGet: + path: /healthz + port: 8000 + initialDelaySeconds: 30 + timeoutSeconds: 10 + readinessProbe: + httpGet: + path: /readiness + port: 8000 + initialDelaySeconds: 30 + timeoutSeconds: 10 + periodSeconds: 5 + {{- end }} + ports: + - containerPort: 8000 + name: http + {{- with index .Values "api" "resources" }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} + {{- include "helmbroker.envs" . | indent 8 }} + {{- include "helmbroker.volumeMounts" . | indent 8 }} {{- include "helmbroker.volumes" . | indent 6 }} + securityContext: + fsGroup: 1001 + runAsGroup: 1001 + runAsUser: 1001 diff --git a/charts/helmbroker/templates/helmbroker-secret-creds.yaml b/charts/helmbroker/templates/helmbroker-secret-creds.yaml new file mode 100644 index 0000000..29b1bc3 --- /dev/null +++ b/charts/helmbroker/templates/helmbroker-secret-creds.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Secret +metadata: + name: helmbroker-creds + labels: + heritage: drycc +data: + {{- if (.Values.valkeyUrl) }} + valkey-url: {{ .Values.valkeyUrl | b64enc }} + {{- end }} \ No newline at end of file diff --git a/charts/helmbroker/values.yaml b/charts/helmbroker/values.yaml index 1be72e0..65b9f54 100644 --- a/charts/helmbroker/values.yaml +++ b/charts/helmbroker/values.yaml @@ -2,38 +2,57 @@ imageOrg: "drycc-addons" imageTag: "canary" imageRegistry: "registry.drycc.cc" imagePullPolicy: "Always" -replicas: 1 -# limitsCpu: "100m" -# limitsMemory: "50Mi" + + +## Enable diagnostic mode +## +diagnosticMode: + ## @param diagnosticMode.enabled Enable diagnostic mode (all probes will be disabled and the command will be overridden) + ## + enabled: false + ## @param diagnosticMode.command Command to override all containers + ## + command: + - sleep + ## @param diagnosticMode.args Args to override all containers + ## + args: + - infinity ## config the helm-broker repositories repositories: -- name: drycc-helm-broker - url: https://github.com/drycc/addons/releases/download/latest/index.yaml - -celeryReplicas: 1 + - name: drycc-helm-broker + url: https://github.com/drycc/addons/releases/download/latest/index.yaml # broker_credentials: # Optional Usernames and passwords that will be required to communicate with service broker username: admin password: admin -# Configuring this will no longer use the built-in rabbitmq component -rabbitmqUrl: "" +# Configuring this will no longer use the built-in valkey component +valkeyUrl: "" # Any custom controller environment variables # can be specified as key-value pairs under environment # this is usually a non required setting. environment: - # DRYCC_DEBUG: true - # HELMBROKER_ROOT: /etc/helmbroker + # HELMBROKER_DEBUG: true + # HELMBROKER_CONFIG_ROOT: /etc/helmbroker api: + replicas: 1 + resources: {} + # limits: + # cpu: 200m + # memory: 50Mi + # requests: + # cpu: 100m + # memory: 30Mi nodeAffinityPreset: key: "drycc.cc/node" type: "soft" values: - - "true" + - "true" podAffinityPreset: type: "" extraMatchLabels: @@ -44,11 +63,19 @@ api: app: "drycc-helmbroker" celery: + replicas: 1 + resources: {} + # limits: + # cpu: 200m + # memory: 50Mi + # requests: + # cpu: 100m + # memory: 30Mi nodeAffinityPreset: key: "drycc.cc/node" type: "soft" values: - - "true" + - "true" podAffinityPreset: type: "" extraMatchLabels: @@ -68,20 +95,5 @@ persistence: storageClass: "" volumeName: "" -global: - # Set the location of Workflow's rabbitmq instance - # Valid values are: - # - on-cluster: Run Rabbitmq within the Kubernetes cluster - # - off-cluster: Run Rabbitmq outside the Kubernetes cluster (configure in controller section) - rabbitmqLocation: "on-cluster" - # Enable usage of RBAC authorization mode - # - # Valid values are: - # - true: all RBAC-related manifests will be installed (in case your cluster supports RBAC) - # - false: no RBAC-related manifests will be installed - rbac: true - # A domain name consists of one or more parts. - # Periods (.) are used to separate these parts. - # Each part must be 1 to 63 characters in length and can contain lowercase letters, digits, and hyphens (-). - # It must start and end with a lowercase letter or digit. - clusterDomain: "cluster.local" +valkey: + enabled: true diff --git a/rootfs/Dockerfile b/rootfs/Dockerfile index 81562a6..85d3134 100644 --- a/rootfs/Dockerfile +++ b/rootfs/Dockerfile @@ -4,9 +4,9 @@ FROM registry.drycc.cc/drycc/base:${CODENAME} ENV DRYCC_UID=1001 \ DRYCC_GID=1001 \ DRYCC_HOME_DIR=/workspace \ - PYTHON_VERSION="3.11" \ - HELM_VERSION="3.12.1" \ - KUBECTL_VERSION="1.27.3" + PYTHON_VERSION="3.14" \ + HELM_VERSION="4.1.3" \ + KUBECTL_VERSION="1.35.3" RUN groupadd drycc --gid ${DRYCC_GID} \ && useradd drycc -u ${DRYCC_UID} -g ${DRYCC_GID} -s /bin/bash -m -d ${DRYCC_HOME_DIR} @@ -15,7 +15,7 @@ COPY . ${DRYCC_HOME_DIR} WORKDIR ${DRYCC_HOME_DIR} RUN buildDeps='musl-dev' \ - && install-packages ${buildDeps} openssl ca-certificates \ + && install-packages ${buildDeps} patch openssl ca-certificates \ && install-stack python ${PYTHON_VERSION} \ && install-stack helm ${HELM_VERSION} \ && install-stack kubectl ${KUBECTL_VERSION} && . init-stack \ diff --git a/rootfs/Dockerfile.test b/rootfs/Dockerfile.test index 8cbb6e6..fe48d53 100644 --- a/rootfs/Dockerfile.test +++ b/rootfs/Dockerfile.test @@ -4,9 +4,9 @@ FROM registry.drycc.cc/drycc/base:${CODENAME} ENV DRYCC_UID=1001 \ DRYCC_GID=1001 \ DRYCC_HOME_DIR=/workspace \ - PYTHON_VERSION="3.11" \ - HELM_VERSION="3.12.1" \ - KUBECTL_VERSION="1.27.3" + PYTHON_VERSION="3.14" \ + HELM_VERSION="4.1.3" \ + KUBECTL_VERSION="1.35.3" RUN groupadd drycc --gid ${DRYCC_GID} \ && useradd drycc -u ${DRYCC_UID} -g ${DRYCC_GID} -s /bin/bash -m -d ${DRYCC_HOME_DIR} @@ -15,7 +15,9 @@ COPY . ${DRYCC_HOME_DIR} WORKDIR ${DRYCC_HOME_DIR} RUN buildDeps='musl-dev'; \ - install-packages ${buildDeps} openssl ca-certificates \ + install-packages ${buildDeps} git openssl ca-certificates \ + && curl -SsL https://cli.codecov.io/latest/$([ $(dpkg --print-architecture) == "arm64" ] && echo linux-arm64 || echo linux)/codecov -o /usr/local/bin/codecov \ + && chmod +x /usr/local/bin/codecov \ && install-stack python ${PYTHON_VERSION} \ && install-stack helm ${HELM_VERSION} \ && install-stack kubectl ${KUBECTL_VERSION} && . init-stack \ diff --git a/rootfs/bin/test-unit b/rootfs/bin/test-unit index ef9755f..1d1d54e 100755 --- a/rootfs/bin/test-unit +++ b/rootfs/bin/test-unit @@ -3,4 +3,4 @@ # This script is designed to be run inside the container # -python -m unittest tests/test_*.py \ No newline at end of file +coverage run -m unittest tests/test_*.py diff --git a/rootfs/bin/upload-coverage b/rootfs/bin/upload-coverage index 53433f3..492452b 100755 --- a/rootfs/bin/upload-coverage +++ b/rootfs/bin/upload-coverage @@ -6,4 +6,8 @@ # fail hard and fast even on pipelines set -eou pipefail -codecov --required +coverage report -m > coverage.txt + +if [[ -n $CODECOV_TOKEN ]]; then + codecov upload-process --plugin=noop -t "$CODECOV_TOKEN" +fi diff --git a/rootfs/dev_requirements.txt b/rootfs/dev_requirements.txt index b46d2dc..ca19f67 100644 --- a/rootfs/dev_requirements.txt +++ b/rootfs/dev_requirements.txt @@ -1,13 +1,10 @@ # test module # test # Run "make test-unit" for the % of code exercised during tests -coverage==7.2.7 +coverage==7.6.1 # Run "make test-style" to check python syntax and style -flake8==6.0.0 - -# code coverage report at https://codecov.io/github/drycc/controller -codecov==2.1.13 +flake8==7.1.1 # mock out python-requests, mostly k8s -requests-mock==1.11.0 +requests-mock==1.12.1 diff --git a/rootfs/helmbroker/broker.py b/rootfs/helmbroker/broker.py index a8dfe75..c920789 100644 --- a/rootfs/helmbroker/broker.py +++ b/rootfs/helmbroker/broker.py @@ -1,6 +1,6 @@ import os -import shutil import logging +import time from typing import Union, List, Optional from openbrokerapi.catalog import ServicePlan @@ -13,11 +13,15 @@ UpdateDetails, UpdateServiceSpec, DeprovisionDetails, \ DeprovisionServiceSpec, LastOperation, OperationState -from .utils import get_instance_path, get_chart_path, get_plan_path, \ - get_addon_path, get_addon_updateable, get_addon_bindable, InstanceLock, \ - load_instance_meta, load_binding_meta, load_addons_meta, \ - get_addon_allow_paras, verify_parameters, get_addon_archive -from .tasks import provision, bind, deprovision, update +from .utils import verify_parameters, new_instance_lock, verify_parameters_by_plan +from .database.fetch import fetch_chart_plan +from .database.query import get_instance_path, get_chart_path, get_plan_path, \ + get_addon_updateable, get_addon_bindable, get_addon_allow_params, \ + get_addon_archive, get_binding_file, get_instance_file +from .database.metadata import load_instance_meta, load_binding_meta, load_addons_meta, \ + save_instance_meta +from .tasks import provision, bind, deprovision, update, unbind +from .config import INSTANCE_NAME_LENS logger = logging.getLogger(__name__) @@ -40,6 +44,11 @@ def provision(self, details: ProvisionDetails, async_allowed: bool, **kwargs) -> ProvisionedServiceSpec: + logger.debug(f"*** provision instance {instance_id}") + # verify instance_name length + if len(details.context["instance_name"]) > INSTANCE_NAME_LENS: + raise ErrBadRequest( + msg=f"The length of the instance name cannot exceed {INSTANCE_NAME_LENS}.") instance_path = get_instance_path(instance_id) if os.path.exists(instance_path): raise ErrInstanceAlreadyExists() @@ -48,9 +57,9 @@ def provision(self, if get_addon_archive(details.service_id): raise ErrBadRequest( msg="This addon has archived.") - allow_paras = get_addon_allow_paras(details.service_id) + allow_params = get_addon_allow_params(details.service_id) not_allow_keys, required_keys = verify_parameters( - allow_paras, details.parameters) + allow_params, details.parameters) if not_allow_keys: raise ErrBadRequest( msg="parameters %s does not allowed" % not_allow_keys) @@ -58,12 +67,12 @@ def provision(self, raise ErrBadRequest( msg="required parameters %s not exists" % required_keys) os.makedirs(instance_path, exist_ok=True) - chart_path, plan_path = ( - get_chart_path(instance_id), get_plan_path(instance_id)) - addon_chart_path, addon_plan_path = ( - get_addon_path(details.service_id, details.plan_id)) - shutil.copytree(addon_chart_path, chart_path) - shutil.copytree(addon_plan_path, plan_path) + chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) + fetch_chart_plan(details.service_id, chart_path, details.plan_id, plan_path) + # verify instance-schema + msg = verify_parameters_by_plan(instance_id, details.parameters) + if msg: + raise ErrBadRequest(msg) provision.delay(instance_id, details) return ProvisionedServiceSpec(state=ProvisionState.IS_ASYNC) @@ -84,6 +93,7 @@ def bind(self, async_allowed: bool, **kwargs ) -> Binding: + logger.debug(f"*** bind instance {instance_id}") is_addon_bindable = get_addon_bindable(details.service_id) if not is_addon_bindable: raise ErrBadRequest( @@ -96,9 +106,6 @@ def bind(self, instance_path = get_instance_path(instance_id) if os.path.exists(f'{instance_path}/bind.json'): raise ErrBindingAlreadyExists() - chart_path, plan_path = ( - get_chart_path(instance_id), get_plan_path(instance_id)) - shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/templates') bind(instance_id, binding_id, details, async_allowed, **kwargs) data = load_binding_meta(instance_id) if data["last_operation"]["state"] == OperationState.SUCCEEDED.value: @@ -114,11 +121,9 @@ def unbind(self, async_allowed: bool, **kwargs ) -> UnbindSpec: - instance_path = get_instance_path(instance_id) - binding_info = f'{instance_path}/binding.json' - if os.path.exists(binding_info): - os.remove(binding_info) - return UnbindSpec(is_async=False) + logger.debug(f"*** unbind instance {instance_id}") + unbind.delay(instance_id) + return UnbindSpec(is_async=True) def update(self, instance_id: str, @@ -126,6 +131,7 @@ def update(self, async_allowed: bool, **kwargs ) -> UpdateServiceSpec: + logger.debug(f"*** update instance {instance_id}") instance_path = get_instance_path(instance_id) if not os.path.exists(instance_path): raise ErrBadRequest(msg="Instance %s does not exist" % instance_id) @@ -133,11 +139,11 @@ def update(self, if not is_plan_updateable: raise ErrBadRequest( msg="Instance %s does not updateable" % instance_id) - allow_paras = get_addon_allow_paras(details.service_id) + allow_params = get_addon_allow_params(details.service_id) logger.debug( f"service instance update parameters: {details.parameters}") not_allow_keys, required_keys = verify_parameters( - allow_paras, details.parameters) + allow_params, details.parameters) if not_allow_keys: raise ErrBadRequest( msg="parameters %s does not allowed" % not_allow_keys) @@ -147,13 +153,17 @@ def update(self, if not async_allowed: raise ErrAsyncRequired() if details.plan_id is not None: - plan_path = get_plan_path(instance_id) - # delete the pre plan - shutil.rmtree(plan_path, ignore_errors=True) - _, addon_plan_path = get_addon_path( - details.service_id, details.plan_id) - # add the new plan - shutil.copytree(addon_plan_path, plan_path) + chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) + fetch_chart_plan(details.service_id, chart_path, details.plan_id, plan_path) + # verify instance-schema + msg = verify_parameters_by_plan(instance_id, details.parameters) + if msg: + raise ErrBadRequest(msg) + data = load_instance_meta(instance_id) + data['last_operation']["state"] = OperationState.IN_PROGRESS.value + data['last_operation']["description"] = ( + f"update {instance_id} in progress at {time.time()}") + save_instance_meta(instance_id, data) update.delay(instance_id, details) return UpdateServiceSpec(is_async=True) @@ -162,9 +172,10 @@ def deprovision(self, details: DeprovisionDetails, async_allowed: bool, **kwargs) -> DeprovisionServiceSpec: + logger.debug(f"*** deprovision instance {instance_id}") if not os.path.exists(get_instance_path(instance_id)): raise ErrInstanceDoesNotExist() - with InstanceLock(instance_id): + with new_instance_lock(instance_id): data = load_instance_meta(instance_id) operation = data["last_operation"]["operation"] if operation == "provision": @@ -181,11 +192,14 @@ def last_operation(self, operation_data: Optional[str], **kwargs ) -> LastOperation: - data = load_instance_meta(instance_id) - return LastOperation( - OperationState(data["last_operation"]["state"]), - data["last_operation"]["description"] - ) + logger.debug(f"*** last_operation instance {instance_id}") + if os.path.exists(get_instance_file(instance_id)): + data = load_instance_meta(instance_id) + return LastOperation( + OperationState(data["last_operation"]["state"]), + data["last_operation"]["description"] + ) + return LastOperation(OperationState.IN_PROGRESS) def last_binding_operation(self, instance_id: str, @@ -193,8 +207,11 @@ def last_binding_operation(self, operation_data: Optional[str], **kwargs ) -> LastOperation: - data = load_binding_meta(instance_id) - return LastOperation( - OperationState(data["last_operation"]["state"]), - data["last_operation"]["description"] - ) + logger.debug(f"*** last_binding_operation instance {instance_id}") + if os.path.exists(get_binding_file(instance_id)): + data = load_binding_meta(instance_id) + return LastOperation( + OperationState(data["last_operation"]["state"]), + data["last_operation"]["description"] + ) + return LastOperation(OperationState.IN_PROGRESS) diff --git a/rootfs/helmbroker/celery.py b/rootfs/helmbroker/celery.py index 2c387f4..45d4181 100644 --- a/rootfs/helmbroker/celery.py +++ b/rootfs/helmbroker/celery.py @@ -1,28 +1,32 @@ import os +from urllib.parse import urlparse, parse_qs, urlencode from kombu import Exchange, Queue from celery import Celery +from .config import VALKEY_URL class Config(object): # Celery Configuration Options - timezone = "Asia/Shanghai" enable_utc = True task_serializer = 'pickle' accept_content = frozenset([ - 'application/data', - 'application/text', - 'application/json', - 'application/x-python-serialize', + 'application/data', + 'application/text', + 'application/json', + 'application/x-python-serialize', ]) task_track_started = True task_time_limit = 30 * 60 worker_max_tasks_per_child = 200 + worker_prefetch_multiplier = 1 result_expires = 24 * 60 * 60 - broker_url = os.environ.get("DRYCC_RABBITMQ_URL", 'amqp://guest:guest@127.0.0.1:5672/') # noqa - broker_connection_retry_on_startup = True - task_default_queue = 'low' + cache_backend = 'django-cache' + task_default_queue = 'helmbroker.middle' task_default_exchange = 'helmbroker.priority' - task_default_routing_key = 'helmbroker.priority.low' + task_default_routing_key = 'helmbroker.priority.middle' + broker_transport_options = {"queue_order_strategy": "sorted"} + task_create_missing_queues = True + task_inherit_parent_priority = True broker_connection_retry_on_startup = True worker_cancel_long_running_tasks_on_connection_loss = True @@ -30,26 +34,58 @@ class Config(object): app = Celery('helmbroker') app.config_from_object(Config()) app.conf.update( + timezone=os.environ.get('TZ', 'UTC'), task_routes={ - 'helmbroker.tasks': { - 'queue': 'low', - 'exchange': 'helmbroker.priority', - 'routing_key': 'helmbroker.priority.high', + 'helmbroker.tasks.provision': { + 'queue': 'helmbroker.high', + 'exchange': 'helmbroker.priority', 'routing_key': 'helmbroker.priority.high', + }, + 'helmbroker.tasks.update': { + 'queue': 'helmbroker.high', + 'exchange': 'helmbroker.priority', 'routing_key': 'helmbroker.priority.high', + }, + 'helmbroker.tasks.bind': { + 'queue': 'helmbroker.high', + 'exchange': 'helmbroker.priority', 'routing_key': 'helmbroker.priority.high', + }, + 'helmbroker.tasks.deprovision': { + 'queue': 'helmbroker.middle', + 'exchange': 'helmbroker.priority', 'routing_key': 'helmbroker.priority.middle', }, }, task_queues=( Queue( - 'low', - exchange=Exchange('helmbroker.priority', type="direct"), + 'helmbroker.low', exchange=Exchange('helmbroker.priority', type="direct"), routing_key='helmbroker.priority.low', - queue_arguments={'x-max-priority': 16}, + ), + Queue( + 'helmbroker.high', exchange=Exchange('helmbroker.priority', type="direct"), + routing_key='helmbroker.priority.high', + ), + Queue( + 'helmbroker.middle', exchange=Exchange('helmbroker.priority', type="direct"), + routing_key='helmbroker.priority.middle', ), ), ) app.autodiscover_tasks(("helmbroker.tasks",)) - - -app.config_from_object(Config()) +url = urlparse(VALKEY_URL) +query = parse_qs(url.query) +broker_transport_options = {"queue_order_strategy": "sorted", "visibility_timeout": 43200} +result_backend_transport_options = {} +if 'master_set' in query: + master_name = query.pop('master_set')[0] + password = url.netloc.split("@")[0].split(":")[1] + kwargs = {'sentinel_kwargs': {'password': password}, 'master_name': master_name} + broker_transport_options.update(kwargs) + result_backend_transport_options.update(kwargs) + VALKEY_URL = f"sentinel://{url.netloc}{url.path}?{urlencode(query)}" +app.conf.update( + broker_url=VALKEY_URL, + result_backend=VALKEY_URL, + broker_transport_options=broker_transport_options, + result_backend_transport_options=result_backend_transport_options, +) if __name__ == '__main__': app.start() diff --git a/rootfs/helmbroker/cleaner.py b/rootfs/helmbroker/cleaner.py index eeeb5ab..6cd456a 100644 --- a/rootfs/helmbroker/cleaner.py +++ b/rootfs/helmbroker/cleaner.py @@ -6,7 +6,8 @@ from openbrokerapi.service_broker import OperationState from .config import INSTANCES_PATH -from .utils import get_instance_file, load_instance_meta +from .database.query import get_instance_file +from .database.metadata import load_instance_meta logger = logging.getLogger(__name__) diff --git a/rootfs/helmbroker/config.py b/rootfs/helmbroker/config.py index 682d58a..c83ad94 100644 --- a/rootfs/helmbroker/config.py +++ b/rootfs/helmbroker/config.py @@ -1,14 +1,18 @@ import os -HELMBROKER_ROOT = os.environ.get("HELMBROKER_ROOT", '/etc/helmbroker') -ADDONS_PATH = os.path.join(HELMBROKER_ROOT, 'addons') -CONFIG_PATH = os.path.join(HELMBROKER_ROOT, 'config') -INSTANCES_PATH = os.path.join(HELMBROKER_ROOT, 'instances') +CONFIG_ROOT = os.environ.get("HELMBROKER_CONFIG_ROOT", '/etc/helmbroker') +ADDONS_PATH = os.path.join(CONFIG_ROOT, 'addons') +CONFIG_PATH = os.path.join(CONFIG_ROOT, 'config') +INSTANCES_PATH = os.path.join(CONFIG_ROOT, 'instances') -USERNAME = os.environ.get('USERNAME') -PASSWORD = os.environ.get('PASSWORD') + +USERNAME = os.environ.get('HELMBROKER_USERNAME') +PASSWORD = os.environ.get('HELMBROKER_PASSWORD') + +VALKEY_URL = os.environ.get("HELMBROKER_VALKEY_URL", 'redis://localhost:6379/0') +INSTANCE_NAME_LENS = int(os.environ.get("INSTANCE_NAME_LENS", '32')) class Config: - DEBUG = bool(os.environ.get('DRYCC_DEBUG', True)) + DEBUG = bool(os.environ.get('HELMBROKER_DEBUG', False)) diff --git a/rootfs/helmbroker/database/__init__.py b/rootfs/helmbroker/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rootfs/helmbroker/database/fetch.py b/rootfs/helmbroker/database/fetch.py new file mode 100644 index 0000000..d2d8102 --- /dev/null +++ b/rootfs/helmbroker/database/fetch.py @@ -0,0 +1,100 @@ +import os +import tarfile +import requests +import yaml +import json +import glob +import shutil +import tempfile +import collections + + +def fetch_addons(repository): + if not repository: + return + temp = tempfile.TemporaryDirectory() + try: + index_name = repository['url'].split('/')[-1] + # download index.yaml + remote_index = requests.get(repository['url']).content.decode(encoding="utf-8") + # new index + with open(f"{temp.name}/{index_name}", 'w') as f: + f.write(remote_index) + remote_index = yaml.load(remote_index, Loader=yaml.Loader) + # save index.yaml addons + download_urls = {} + for addon_name, addon_metas in remote_index.get('entries', {}).items(): + download_urls[addon_name] = {} + for addon_meta in addon_metas: + url = "/".join(repository["url"].split("/")[0:-1]) + meta_name = f'{addon_name}-{addon_meta["version"]}' + addon_tgz_url = f'{url}/{meta_name}.tgz' + _fetch_addon(addon_tgz_url, f'{temp.name}/{meta_name}') + return _read_addons_meta(temp.name) + finally: + temp.cleanup() + + +def fetch_chart_plan(addon_id, chart_path, plan_id, plan_path): + from .query import get_addon_meta + addon_meta = get_addon_meta(addon_id) + addon_plan = [plan for plan in addon_meta['plans'] if plan['id'] == plan_id][0] + temp = tempfile.TemporaryDirectory() + try: + _fetch_addon(addon_meta['url'], temp.name) + shutil.rmtree(chart_path, ignore_errors=True) + shutil.rmtree(plan_path, ignore_errors=True) + shutil.copytree(os.path.join(temp.name, "chart", addon_meta["name"]), chart_path) + shutil.copytree(os.path.join(temp.name, "plans", addon_plan['name']), plan_path) + finally: + temp.cleanup() + + +def _fetch_addon(url, dest): + with tempfile.TemporaryFile(suffix=".tgz") as tgz_file: + tgz_file.write(requests.get(url).content) + tgz_file.flush() + tgz_file.seek(0) + os.makedirs(dest, exist_ok=True) + with tarfile.open(fileobj=tgz_file, mode="r:gz") as tarobj: + for tarinfo in tarobj: + tarobj.extract(tarinfo.name, dest, filter='data') + filename1 = os.path.join(dest, "meta.yaml") + with open(filename1, "r") as f1: + meta = yaml.load(stream=f1, Loader=yaml.Loader) + meta['url'] = url + meta['version'] = str(meta['version']) + meta['tags'] = [tag.strip() for tag in meta.get('tags').split(',') if tag.strip()] + with open(os.path.join(dest, "meta.json"), "w") as f2: + json.dump(meta, f2) + os.remove(filename1) + + +def _read_addons_meta(addons_path): + addons_meta = collections.OrderedDict() + for metafile in glob.glob(os.path.join(addons_path, "*", "meta.json")): + with open(metafile) as f1: + meta = json.load(f1) + meta['plans'] = [] + metapath = os.path.join(os.path.dirname(metafile)) + for planfile in glob.glob(os.path.join(metapath, "plans", "*", "meta.yaml")): + with open(planfile, 'r') as f2: + plan = yaml.load(f2.read(), Loader=yaml.Loader) + meta["plans"].append(plan) + addons_meta[meta['displayName']] = meta + return addons_meta + + +def main(): + from ..config import CONFIG_PATH + from .metadata import save_addons_meta + addons_meta = collections.OrderedDict() + with open(f'{CONFIG_PATH}/repositories', 'r') as f: + repositories = yaml.load(f.read(), Loader=yaml.Loader) + for repository in repositories: + addons_meta.update(fetch_addons(repository)) + save_addons_meta(addons_meta) + + +if __name__ == '__main__': + main() diff --git a/rootfs/helmbroker/database/metadata.py b/rootfs/helmbroker/database/metadata.py new file mode 100644 index 0000000..2877aaa --- /dev/null +++ b/rootfs/helmbroker/database/metadata.py @@ -0,0 +1,184 @@ +import os +import json +import time +import logging +import jsonschema + +from ..utils import get_valkey_client +from ..config import ADDONS_PATH + +logger = logging.getLogger(__name__) + +INSTANCE_META_SCHEMA = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "details": { + "type": "object", + "properties": { + "service_id": {"type": "string"}, + "plan_id": {"type": "string"}, + "context": {"type": "object"}, + "parameters": { + 'oneOf': [{'type': 'object'}, {'type': 'null'}] + }, + }, + "required": [ + "service_id", "plan_id", "context" + ] + }, + "last_operation": { + "type": "object", + "properties": { + "state": {"type": "string"}, + "operation": {"type": "string"}, + "description": {"type": "string"} + } + }, + "last_modified_time": {"type": "number"} + }, +} + +BINDING_META_SCHEMA = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "credentials": { + "type": "object", + }, + "last_operation": { + "type": "object", + "properties": { + "state": {"type": "string"}, + "description": {"type": "string"} + } + }, + "last_modified_time": {"type": "number"} + } +} + +ADDONS_META_SCHEMA = { + "type": "object", + "patternProperties": { + ".*": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + "version": {"type": "string"}, + "bindable": {"type": "boolean"}, + "instances_retrievable": {"type": "boolean"}, + "bindings_retrievable": {"type": "boolean"}, + "allow_context_updates": {"type": "boolean"}, + "description": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}}, + "requires": {"type": "array"}, + "metadata": {"type": "object"}, + "plan_updateable": {"type": "boolean"}, + "dashboard_client": {"type": "object"}, + "plans": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + "description": {"type": "string"}, + "metadata": {"type": "object"}, + "free": {"type": "boolean"}, + "bindable": {"type": "boolean"}, + "binding_rotatable": {"type": "boolean"}, + "plan_updateable": {"type": "boolean"}, + "schemas": {"type": "object"}, + "maximum_polling_duration": {"type": "integer"}, + "maintenance_info": {"type": "object"}, + }, + "required": ["id", "name", "description"] + }, + "minItems": 1, + + }, + }, + "required": ["id", "name", "description", "bindable", "version", "plans"] + }, + }, + "additionalProperties": False, +} + + +def save_instance_meta(instance_id, data): + cache_key = f"helmbroker:instance:{instance_id}" + from .query import get_instance_file + data["last_modified_time"] = time.time() + file = get_instance_file(instance_id) + jsonschema.validate(instance=data, schema=INSTANCE_META_SCHEMA) + + json_data = json.dumps(data, sort_keys=True, indent=2) + with open(file, "w") as f: + f.write(json_data) + get_valkey_client().set(cache_key, json_data) + + +def load_instance_meta(instance_id): + cache_key = f"helmbroker:instance:{instance_id}" + valkey = get_valkey_client() + + json_data = valkey.get(cache_key) + if not json_data: + from .query import get_instance_file + file = get_instance_file(instance_id) + with open(file) as f: + json_data = f.read() + valkey.set(cache_key, json_data) + return json.loads(json_data) + + +def save_binding_meta(instance_id, data): + from .query import get_binding_file + cache_key = f"helmbroker:binding:{instance_id}" + data["last_modified_time"] = time.time() + file = get_binding_file(instance_id) + jsonschema.validate(instance=data, schema=BINDING_META_SCHEMA) + + json_data = json.dumps(data, sort_keys=True, indent=2) + with open(file, "w") as f: + f.write(json_data) + get_valkey_client().set(cache_key, json_data) + + +def load_binding_meta(instance_id): + from .query import get_binding_file + cache_key = f"helmbroker:binding:{instance_id}" + valkey = get_valkey_client() + json_data = valkey.get(cache_key) + if not json_data: + file = get_binding_file(instance_id) + with open(file, 'r') as f: + json_data = f.read() + valkey.set(cache_key, json_data) + return json.loads(json_data) + + +def save_addons_meta(data): + cache_key = "helmbroker:addons" + os.makedirs(ADDONS_PATH, exist_ok=True) + file = os.path.join(ADDONS_PATH, "addons.json") + jsonschema.validate(instance=data, schema=ADDONS_META_SCHEMA) + + json_data = json.dumps(data, sort_keys=True, indent=2) + with open(file, "w") as f: + f.write(json_data) + get_valkey_client().set(cache_key, json_data) + + +def load_addons_meta(): + cache_key = "helmbroker:addons" + valkey = get_valkey_client() + + json_data = valkey.get(cache_key) + if not json_data: + file = os.path.join(ADDONS_PATH, "addons.json") + with open(file, 'r') as f: + json_data = f.read() + valkey.set(cache_key, json_data) + return json.loads(json_data) diff --git a/rootfs/helmbroker/database/query.py b/rootfs/helmbroker/database/query.py new file mode 100644 index 0000000..741cfc3 --- /dev/null +++ b/rootfs/helmbroker/database/query.py @@ -0,0 +1,116 @@ +import os +import base64 + +from ..utils import command +from ..config import INSTANCES_PATH +from .metadata import load_addons_meta + + +def get_instance_path(instance_id): + return os.path.join(INSTANCES_PATH, instance_id) + + +def get_instance_file(instance_id): + return os.path.join(get_instance_path(instance_id), "instance.json") + + +def get_chart_path(instance_id): + return os.path.join(get_instance_path(instance_id), "chart") + + +def get_plan_path(instance_id): + return os.path.join(get_instance_path(instance_id), "plan") + + +def get_plan_schema_path(instance_id): + return os.path.join(get_instance_path(instance_id), "plan", "instance-schema.json") + + +def get_hooks_path(instance_id): + return os.path.join(get_plan_path(instance_id), "hooks") + + +def get_hooks_result_file(instance_id): + return os.path.join(get_instance_path(instance_id), "hooks-result.json") + + +def get_binding_file(instance_id): + return os.path.join(get_instance_path(instance_id), "binding.json") + + +def get_backups_path(instance_id): + return os.path.join(get_instance_path(instance_id), "backups") + + +def get_addon_values_file(instance_id): + return os.path.join(get_instance_path(instance_id), "addon-values.yaml") + + +def get_custom_addon_values_file(instance_id): + return os.path.join(get_instance_path(instance_id), "custom-addon-values.yaml") + + +def get_addon_updateable(addon_id): + addon_meta = get_addon_meta(addon_id) + return addon_meta.get('plan_updateable', False) + + +def get_addon_bindable(addon_id): + addon_meta = get_addon_meta(addon_id) + return addon_meta.get('bindable', False) + + +def get_addon_allow_params(addon_id): + addon_meta = get_addon_meta(addon_id) + return addon_meta.get('allow_parameters', []) + + +def get_addon_archive(addon_id): + addon_meta = get_addon_meta(addon_id) + return addon_meta.get('archive', False) + + +def get_cred_value(ns, source): + if source.get('serviceRef'): + return _get_service_key_value(ns, source['serviceRef']) + if source.get('configMapRef'): + return _get_config_map_key_value(ns, source['configMapRef']) + if source.get('secretKeyRef'): + return _get_secret_key_value(ns, source['secretKeyRef']) + return -1, 'invalid valueFrom' + + +def get_addon_meta(addon_id): + addons_meta = load_addons_meta() + addons_meta = [ + addon for addon in [addons for _, addons in addons_meta.items()] + if addon['id'] == addon_id + ] + return addons_meta[0] if len(addons_meta) > 0 else None + + +def _get_service_key_value(ns, service_ref): + args = [ + "get", "svc", service_ref['name'], "-n", ns, + '-o', f"jsonpath=\'{service_ref['jsonpath']}\'", + ] + return command("kubectl", *args) + + +def _get_config_map_key_value(ns, config_map_ref): + args = [ + "get", "cm", config_map_ref['name'], "-n", ns, + '-o', f"jsonpath=\'{config_map_ref['jsonpath']}\'", + ] + return command("kubectl", *args) + + +def _get_secret_key_value(ns, secret_ref): + args = [ + "get", "secret", secret_ref['name'], "-n", ns, '-o', + f"jsonpath=\'{secret_ref['jsonpath']}\'", + ] + status, output = command("kubectl", *args) + if status == 0: + output = base64.b64decode(output).decode() + return status, output diff --git a/rootfs/helmbroker/database/savepoint.py b/rootfs/helmbroker/database/savepoint.py new file mode 100644 index 0000000..ba97960 --- /dev/null +++ b/rootfs/helmbroker/database/savepoint.py @@ -0,0 +1,65 @@ +import os +import json +import logging +import yaml +import shutil +import datetime + +from ..config import CONFIG_PATH +from .query import get_instance_path, get_backups_path, get_addon_meta, get_addon_values_file, \ + get_custom_addon_values_file, get_hooks_result_file + +logger = logging.getLogger(__name__) + + +def backup_instance(instance_id): + now = datetime.datetime.now(datetime.timezone.utc) + backup_path = os.path.join(get_backups_path(instance_id), now.isoformat()) + os.makedirs(backup_path, exist_ok=True) + + hooks_result_file = get_hooks_result_file(instance_id) + if os.path.exists(hooks_result_file): + shutil.copy(hooks_result_file, backup_path) + addon_values_file = get_addon_values_file(instance_id) + if os.path.exists(addon_values_file): + shutil.copy(addon_values_file, backup_path) + custom_addon_values_file = get_custom_addon_values_file(instance_id) + if os.path.exists(custom_addon_values_file): + shutil.copy(custom_addon_values_file, backup_path) + + instance_path = get_instance_path(instance_id) + shutil.copytree(os.path.join(instance_path, "plan"), os.path.join(backup_path, "plan")) + shutil.copytree(os.path.join(instance_path, "chart"), os.path.join(backup_path, "chart")) + + +def save_raw_values(instance_id, data): + file = get_custom_addon_values_file(instance_id) + with open(file, "w") as f: + f.write(data) + return file + + +def save_addon_values(service_id, instance_id): + file = get_addon_values_file(instance_id) + service = get_addon_meta(service_id) + logger.debug(f"save_addon_values service: {service}") + if not os.path.exists(f'{CONFIG_PATH}/addon-values'): + return None + with open(file, "w") as fw: + with open(f'{CONFIG_PATH}/addon-values', 'r') as f: + addons_values = yaml.load(f.read(), Loader=yaml.Loader) + logger.debug(f"save_addon_values addons_values: {addons_values}") + addon_values = addons_values.get(service["name"], {}).\ + get(service["version"], {}) + logger.debug(f"save_addon_values addon_values: {addon_values}") + if not addon_values: + return None + fw.write(yaml.dump(addon_values)) + return file + + +def save_hooks_result(instance_id, data): + file = get_hooks_result_file(instance_id) + with open(file, "w") as f: + f.write(json.dumps(data, sort_keys=True, indent=2)) + return file diff --git a/rootfs/helmbroker/gunicorn/config.py b/rootfs/helmbroker/gunicorn/config.py index 4038c74..716f9e5 100644 --- a/rootfs/helmbroker/gunicorn/config.py +++ b/rootfs/helmbroker/gunicorn/config.py @@ -18,14 +18,14 @@ def worker_int(worker): - """Print a stack trace when a worker receives a SIGINT or SIGQUIT signal.""" # noqa + """Print a stack trace when a worker receives a SIGINT or SIGQUIT signal.""" worker.log.warning('worker terminated') import traceback traceback.print_stack() def worker_abort(worker): - """Print a stack trace when a worker receives a SIGABRT signal, generally on timeout.""" # noqa + """Print a stack trace when a worker receives a SIGABRT signal, generally on timeout.""" worker.log.warning('worker aborted') import traceback traceback.print_stack() diff --git a/rootfs/helmbroker/gunicorn/logging.py b/rootfs/helmbroker/gunicorn/logging.py index f9e65f3..41a3f9e 100644 --- a/rootfs/helmbroker/gunicorn/logging.py +++ b/rootfs/helmbroker/gunicorn/logging.py @@ -1,12 +1,12 @@ -import os from gunicorn.glogging import Logger +from helmbroker.config import Config class Logging(Logger): def access(self, resp, req, environ, request_time): # health check endpoints are only logged in debug mode if ( - not os.environ.get('DEBUG', False) and + not Config.DEBUG and req.path in ['/readiness', '/healthz'] ): return diff --git a/rootfs/helmbroker/loader.py b/rootfs/helmbroker/loader.py deleted file mode 100644 index abdec39..0000000 --- a/rootfs/helmbroker/loader.py +++ /dev/null @@ -1,120 +0,0 @@ -import os -import shutil -import tarfile -import requests -import yaml - -from .config import ADDONS_PATH, CONFIG_PATH -from .utils import dump_addons_meta - - -def download_file(url, dest): - if not os.path.exists(dest): - os.system(f'mkdir -p {dest}') - filename = url.split('/')[-1] - file = requests.get(url) - with open(f"{dest}/{filename}", 'wb') as f: - f.write(file.content) - if filename.endswith(".yaml") or filename.endswith(".yml"): - return yaml.load(file.content.decode(encoding="utf-8"), - Loader=yaml.Loader) - - -def read_file(filename): - if not os.path.exists(filename): - return - with open(filename, 'r') as f: - file_content = f.read() - return file_content - - -def save_file(content, dest, filename): - if not os.path.exists(dest): - os.system(f'mkdir -p {dest}') - with open(f"{dest}/{filename}", 'w') as f: - f.write(content) - - -def extract_tgz(tgz_file, dest): - if not os.path.exists(tgz_file): - return - if not os.path.exists(dest): - os.system(f'mkdir -p {dest}') - - tarobj = tarfile.open(tgz_file, "r:gz") - for tarinfo in tarobj: - tarobj.extract(tarinfo.name, dest) - tarobj.close() - - -def addons_meta_file(): - meta_files = [] - # get meta.yaml - for root, dirnames, filenames in os.walk(ADDONS_PATH): - for filename in filenames: - if filename == 'meta.yaml': - meta_files.append(os.path.join(root, filename)) - meta_files = [meta_file.split(ADDONS_PATH)[1] for meta_file in meta_files] - addons_meta = [] - plans_meta = [] - for meta_file in meta_files: - if len(meta_file.split('/')) == 3: - addons_meta.append(meta_file.split('/')[1:]) - else: - plans_meta.append(meta_file.split('/')[1:]) - addons_dict = {} - for addon_meta in addons_meta: - with open(f'{ADDONS_PATH}/{"/".join(addon_meta)}', 'r') as f: - meta = yaml.load(f.read(), Loader=yaml.Loader) - meta['tags'] = meta.get('tags').split(', ') if meta.get('tags') else [] # noqa - meta['plans'] = [] - addons_dict[meta['displayName']] = meta - addon_plans_meta = [] - for plan_meta in plans_meta: - if plan_meta[0] == meta['displayName']: - addon_plans_meta.append(plan_meta) - elif f'{"-".join(plan_meta[0].split("-")[0:-1])}' == meta['displayName']: # noqa - addon_plans_meta.append(plan_meta) - for addon_plan_meta in addon_plans_meta: - with open(f'{ADDONS_PATH}/{"/".join(addon_plan_meta)}', 'r') as f: - addons_mata = yaml.load(f.read(), Loader=yaml.Loader) - addons_dict[meta['displayName']]['plans'].append(addons_mata) # noqa - dump_addons_meta(addons_dict) - - -def load_addons(repository): - if not repository: - return - index_name = repository['url'].split('/')[-1] - local_index_file = f'{ADDONS_PATH}/{index_name}' - # download index.yaml - remote_index = requests.get(repository['url']).content.decode( - encoding="utf-8") - # compare index.yaml, is update - local_index = read_file(local_index_file) - if local_index and remote_index == local_index: - return - # delete old repository catalog - if os.path.exists(ADDONS_PATH): - shutil.rmtree(ADDONS_PATH, ignore_errors=True) - else: - os.makedirs(ADDONS_PATH, exist_ok=True) - # new index - save_file(remote_index, ADDONS_PATH, index_name) - remote_index = yaml.load(remote_index, Loader=yaml.Loader) - # save index.yaml addons - for addon_name, v in remote_index.get('entries', {}).items(): - for _ in v: - url = "/".join(repository["url"].split("/")[0:-1]) - tgz_name = f'{addon_name}-{_["version"]}' - addon_tgz_url = f'{url}/{tgz_name}.tgz' - download_file(addon_tgz_url, ADDONS_PATH) - extract_tgz(f'{ADDONS_PATH}/{tgz_name}.tgz', - f'{ADDONS_PATH}/{tgz_name}') - addons_meta_file() - - -if __name__ == '__main__': - with open(f'{CONFIG_PATH}/repositories', 'r') as f: - repositories = yaml.load(f.read(), Loader=yaml.Loader) - load_addons(repositories[0]) diff --git a/rootfs/helmbroker/tasks.py b/rootfs/helmbroker/tasks.py index 380d306..69f2f30 100644 --- a/rootfs/helmbroker/tasks.py +++ b/rootfs/helmbroker/tasks.py @@ -1,141 +1,134 @@ import os import time -import shutil import yaml import logging +import shutil from openbrokerapi.service_broker import ProvisionDetails, OperationState, \ UpdateDetails, BindDetails from .celery import app -from .utils import get_plan_path, get_chart_path, get_cred_value, \ - InstanceLock, dump_instance_meta, dump_binding_meta, load_instance_meta, \ - get_instance_file, helm, dump_addon_values, format_paras_to_helm_args +from .utils import helm, format_params_to_helm_args, new_instance_lock, run_instance_hooks + +from .database.metadata import save_instance_meta, save_binding_meta, load_instance_meta, \ + load_binding_meta +from .database.savepoint import save_addon_values, backup_instance +from .database.query import get_plan_path, get_chart_path, get_cred_value, get_binding_file logger = logging.getLogger(__name__) @app.task(serializer='pickle') def provision(instance_id: str, details: ProvisionDetails): - with InstanceLock(instance_id): + logger.debug(f"*** task provision instance: {instance_id}, before lock") + with ( + new_instance_lock(instance_id), + run_instance_hooks(instance_id, "provision") as (status, output) + ): + logger.debug(f"*** task provision instance: {instance_id}") + backup_instance(instance_id) # create instance.json - dump_instance_meta(instance_id, { - "id": instance_id, + data = { + "id": instance_id, "last_operation": {}, "details": { - "service_id": details.service_id, - "plan_id": details.plan_id, + "service_id": details.service_id, "plan_id": details.plan_id, "context": details.context, "parameters": details.parameters if details.parameters else {}, }, - "last_operation": { - "state": OperationState.IN_PROGRESS.value, - "operation": "provision", - "description": ( - "provision %s in progress at %s" % ( - instance_id, time.time())) - } - }) - + } + if status != 0: + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"provision {instance_id} error: {output}" + save_instance_meta(instance_id, data) + return + data["last_operation"]["state"] = OperationState.IN_PROGRESS.value + data["last_operation"]["operation"] = "provision" + data["last_operation"]["description"] = ( + f"provision {instance_id} in progress at {time.time()}") + save_instance_meta(instance_id, data) chart_path = get_chart_path(instance_id) bind_yaml = f'{chart_path}/templates/bind.yaml' if os.path.exists(bind_yaml): os.remove(bind_yaml) if os.path.exists(f'{chart_path}/Chart.yaml'): - args = [ - "dependency", - "update", - chart_path, - ] + args = ["dependency", "update", chart_path] helm(instance_id, *args) values_file = os.path.join(get_plan_path(instance_id), "values.yaml") args = [ - "install", - details.context["instance_name"], - chart_path, - "--namespace", - details.context["namespace"], - "--create-namespace", - "--wait", - "--timeout", - "25m0s", - "-f", - values_file, - "--set", - f"fullnameOverride=helmbroker-{details.context['instance_name']}" + "install", details.context["instance_name"], chart_path, + "--namespace", details.context["namespace"], "--create-namespace", + "--wait", "--timeout", "25m0s", "-f", values_file, + "--set", f"fullnameOverride=helmbroker-{details.context['instance_name']}" ] - addon_values_file = dump_addon_values(details.service_id, instance_id) + addon_values_file = save_addon_values(details.service_id, instance_id) if addon_values_file: args.insert(9, "-f") args.insert(10, addon_values_file) - logger.debug(f"helm install parameters :{details.parameters}") - args = format_paras_to_helm_args(instance_id, details.parameters, args) - logger.debug(f"helm install args:{args}") + logger.debug(f"helm install parameters: {details.parameters}") + args = format_params_to_helm_args(instance_id, details.parameters, args) + logger.debug(f"helm install args: {args}") status, output = helm(instance_id, *args) - data = load_instance_meta(instance_id) if status != 0: data["last_operation"]["state"] = OperationState.FAILED.value - data["last_operation"]["description"] = ( - "provision error:\n%s" % output) + data["last_operation"]["description"] = f"provision {instance_id} error: {output}" else: data["last_operation"]["state"] = OperationState.SUCCEEDED.value data["last_operation"]["description"] = ( - "provision succeeded at %s" % time.time()) - dump_instance_meta(instance_id, data) + f"provision {instance_id} succeeded at {time.time()}") + save_instance_meta(instance_id, data) @app.task(serializer='pickle') def update(instance_id: str, details: UpdateDetails): - data = load_instance_meta(instance_id) - if details.service_id: - data['details']['service_id'] = details.service_id - if details.plan_id: - data['details']['service_id'] = details.plan_id - if details.context: - data['details']['context'] = details.context - if details.parameters: - paras = data['details']['parameters'] - paras.update(details.parameters) - # remove the key which value is null - data['details']['parameters'] = {k: v for k, v in paras.items() if v != ""} # noqa - data['last_operation']["state"] = OperationState.IN_PROGRESS.value - data['last_operation']["description"] = "update %s in progress at %s" % (instance_id, time.time()) # noqa - dump_instance_meta(instance_id, data) - chart_path = get_chart_path(instance_id) - values_file = os.path.join(get_plan_path(instance_id), "values.yaml") - args = [ - "upgrade", - details.context["instance_name"], - chart_path, - "--namespace", - details.context["namespace"], - "--create-namespace", - "--wait", - "--timeout", - "25m0s", - "--reuse-values", - "-f", - values_file, - "--set", - f"fullnameOverride=helmbroker-{details.context['instance_name']}" - ] - addon_values_file = dump_addon_values(details.service_id, instance_id) - if addon_values_file: - args.insert(10, "-f") - args.insert(11, addon_values_file) - paras = data['details']['parameters'] - logger.debug(f"helm upgrade parameters: {paras}") - args = format_paras_to_helm_args(instance_id, paras, args) - logger.debug(f"helm upgrade args:{args}") - status, output = helm(instance_id, *args) - if status != 0: - data["last_operation"]["state"] = OperationState.FAILED.value - data["last_operation"]["description"] = ( - "update %s failed: %s" % (instance_id, output)) - else: - data["last_operation"]["state"] = OperationState.SUCCEEDED.value - data["last_operation"]["description"] = ( - "update %s succeeded at %s" % (instance_id, time.time())) - dump_instance_meta(instance_id, data) + logger.debug(f"*** task update instance: {instance_id}, before lock") + with ( + new_instance_lock(instance_id), + run_instance_hooks(instance_id, "update") as (status, output) + ): + logger.debug(f"*** task update instance: {instance_id}") + backup_instance(instance_id) + data = load_instance_meta(instance_id) + if details.service_id: + data['details']['service_id'] = details.service_id + if details.plan_id: + data['details']['service_id'] = details.plan_id + if details.context: + data['details']['context'] = details.context + if details.parameters: + params = data['details']['parameters'] + params.update(details.parameters) + # remove the key which value is null + data['details']['parameters'] = {k: v for k, v in params.items() if v != ""} + if status != 0: + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"update {instance_id} failed: {output}" + save_instance_meta(instance_id, data) + return + chart_path = get_chart_path(instance_id) + values_file = os.path.join(get_plan_path(instance_id), "values.yaml") + args = [ + "upgrade", details.context["instance_name"], chart_path, + "--namespace", details.context["namespace"], "--create-namespace", + "--wait", "--timeout", "25m0s", "--reset-then-reuse-values", "-f", values_file, + "--set", f"fullnameOverride=helmbroker-{details.context['instance_name']}" + ] + addon_values_file = save_addon_values(details.service_id, instance_id) + if addon_values_file: + args.insert(10, "-f") + args.insert(11, addon_values_file) + params = data['details']['parameters'] + logger.debug(f"helm upgrade parameters: {params}") + args = format_params_to_helm_args(instance_id, params, args) + logger.debug(f"helm upgrade args: {args}") + status, output = helm(instance_id, *args) + if status != 0: + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"update {time.time()} failed: {output}" + else: + data["last_operation"]["state"] = OperationState.SUCCEEDED.value + data["last_operation"]["description"] = ( + f"update {instance_id} succeeded at {time.time()}") + save_instance_meta(instance_id, data) @app.task(serializer='pickle') @@ -144,96 +137,127 @@ def bind(instance_id: str, details: BindDetails, async_allowed: bool, **kwargs): - data = { - "binding_id": binding_id, - "credentials": {}, - "last_operation": { - "state": OperationState.IN_PROGRESS.value, - "description": ( - "binding %s in progress at %s" % (binding_id, time.time())) - } - } - dump_binding_meta(instance_id, data) + logger.debug(f"*** task bind instance: {instance_id}, before lock") + with ( + new_instance_lock(instance_id), + run_instance_hooks(instance_id, "bind") as (status, output) + ): + logger.debug(f"*** task bind instance: {instance_id}") + backup_instance(instance_id) + data = {"binding_id": binding_id, "credentials": {}, "last_operation": {}} + if status != 0: + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"binding {instance_id} failed: {output}" + save_binding_meta(instance_id, data) + return + save_binding_meta(instance_id, data) + chart_path, plan_path = ( + get_chart_path(instance_id), get_plan_path(instance_id)) + shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/templates') + values_file = os.path.join(get_plan_path(instance_id), "values.yaml") + args = [ + "template", details.context["instance_name"], chart_path, + "-f", values_file, + "--set", f"fullnameOverride=helmbroker-{details.context['instance_name']}", + "--namespace", details.context["namespace"], + ] + instance_data = load_instance_meta(instance_id) + params = instance_data["details"]["parameters"] + logger.debug(f"helm template parameters: {params}") + args = format_params_to_helm_args(instance_id, params, args) + logger.debug(f"helm template args: {args}") + status, templates = helm(instance_id, *args) # output: templates.yaml + if status != 0: + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"binding {instance_id} failed: {templates}" + credential_template = {} + templates = yaml.load_all(templates, Loader=yaml.SafeLoader) + credential_template = next( + (item for item in templates if isinstance(item, dict) and "credential" in item), {} + ) + success_flag = True + errors = [] + for _ in credential_template.get('credential', {}): + if _.get('valueFrom'): + status, val = get_cred_value(details.context["namespace"], _['valueFrom']) + elif _.get('value'): + status, val = 0, _['value'] + else: + status, val = -1, 'invalid value' + if status != 0: + success_flag = False + errors.append(val) + else: + data['credentials'][_['name']] = val + if success_flag: + data['last_operation']['state'] = OperationState.SUCCEEDED.value + data['last_operation']['description'] = ( + f"binding {instance_id} succeeded at {time.time()}") + else: + data['last_operation']['state'] = OperationState.FAILED.value + data['last_operation']['description'] = ( + f"binding {instance_id} failed: {','.join(errors)}") + bind_yaml = f'{chart_path}/templates/bind.yaml' + if os.path.exists(bind_yaml): + os.remove(bind_yaml) + save_binding_meta(instance_id, data) - chart_path = get_chart_path(instance_id) - values_file = os.path.join(get_plan_path(instance_id), "values.yaml") - args = [ - "template", - details.context["instance_name"], - chart_path, - "-f", - values_file, - "--set", - f"fullnameOverride=helmbroker-{details.context['instance_name']}" - ] - instance_data = load_instance_meta(instance_id) - paras = instance_data["details"]["parameters"] - logger.debug(f"helm template parameters: {paras}") - args = format_paras_to_helm_args(instance_id, paras, args) - logger.debug(f"helm template args: {args}") - status, templates = helm(instance_id, *args) # output: templates.yaml - if status != 0: - data["last_operation"]["state"] = OperationState.FAILED.value - data["last_operation"]["description"] = "binding %s failed: %s" % (instance_id, templates) # noqa - credential_template = yaml.load(templates.split('bind.yaml')[1], Loader=yaml.Loader) # noqa - success_flag = True - errors = [] - for _ in credential_template['credential']: - if _.get('valueFrom'): - status, val = get_cred_value(details.context["namespace"], _['valueFrom']) # noqa - elif _.get('value'): - status, val = 0, _['value'] - else: - status, val = -1, 'invalid value' +@app.task(serializer='pickle') +def unbind(instance_id): + logger.debug(f"*** task unbind instance: {instance_id}, before lock") + with ( + new_instance_lock(instance_id), + run_instance_hooks(instance_id, "unbind") as (status, output) + ): + logger.debug(f"*** task unbind instance: {instance_id}") + backup_instance(instance_id) + data = load_binding_meta(instance_id) if status != 0: - success_flag = False - errors.append(val) - else: - data['credentials'][_['name']] = val - if success_flag: - data['last_operation'] = { - 'state': OperationState.SUCCEEDED.value, - 'description': "binding %s succeeded at %s" % (instance_id, time.time()) # noqa - } - else: - data['last_operation'] = { - 'state': OperationState.FAILED.value, - 'description': "binding %s failed: %s" % (instance_id, ','.join(errors)) # noqa - } - dump_binding_meta(instance_id, data) - bind_yaml = f'{chart_path}/templates/bind.yaml' - if os.path.exists(bind_yaml): - os.remove(bind_yaml) + data['last_operation']['state'] = OperationState.FAILED.value + data['last_operation']['description'] = f"unbind {instance_id} failed: {output}" + save_binding_meta(instance_id, data) + return + binding_file = get_binding_file(instance_id) + if os.path.exists(binding_file): + os.remove(binding_file) + data['last_operation']['state'] = OperationState.SUCCEEDED.value + data['last_operation']['description'] = f"unbind {instance_id} succeeded at {time.time()}" + save_binding_meta(instance_id, data) -@app.task() +@app.task(serializer='pickle') def deprovision(instance_id: str): - with InstanceLock(instance_id): - shutil.copy(get_instance_file(instance_id), "%s.%s" % ( - get_instance_file(instance_id), time.time() - )) + logger.debug(f"*** task deprovision instance: {instance_id}, before lock") + with ( + new_instance_lock(instance_id), + run_instance_hooks(instance_id, "deprovision") as (status, output) + ): + logger.debug(f"*** task deprovision instance: {instance_id}") + backup_instance(instance_id) data = load_instance_meta(instance_id) + if status != 0: + data["last_operation"]["operation"] = "deprovision" + data["last_operation"]["state"] = OperationState.FAILED.value + data["last_operation"]["description"] = f"deprovision {instance_id} failed: {output}" + save_instance_meta(instance_id, data) + return data["last_operation"]["operation"] = "deprovision" data["last_operation"]["state"] = OperationState.IN_PROGRESS.value data["last_operation"]["description"] = ( - "deprovision %s in progress at %s" % (instance_id, time.time())) - dump_instance_meta(instance_id, data) + f"deprovision {instance_id} in progress at {time.time()}") + save_instance_meta(instance_id, data) args = [ - "uninstall", - data["details"]["context"]["instance_name"], - "--namespace", - data["details"]["context"]["namespace"], + "uninstall", data["details"]["context"]["instance_name"], + "--namespace", data["details"]["context"]["namespace"], ] logger.debug(f"helm uninstall args: {args}") status, output = helm(instance_id, *args) if status != 0: data["last_operation"]["state"] = OperationState.FAILED.value - data["last_operation"]["description"] = ( - "deprovision error:\n%s" % output) + data["last_operation"]["description"] = f"deprovision {instance_id} failed: {output}" else: - data["last_operation"]["state"] = ( - OperationState.SUCCEEDED.value) + data["last_operation"]["state"] = OperationState.SUCCEEDED.value data["last_operation"]["description"] = ( - "deprovision succeeded at %s" % time.time()) - dump_instance_meta(instance_id, data) + f"deprovision {instance_id} succeeded at {time.time()}") + save_instance_meta(instance_id, data) diff --git a/rootfs/helmbroker/utils.py b/rootfs/helmbroker/utils.py index a2f6d31..e749e85 100644 --- a/rootfs/helmbroker/utils.py +++ b/rootfs/helmbroker/utils.py @@ -1,111 +1,25 @@ import os -import fcntl import yaml import json import subprocess -import time import base64 import copy import logging - -from jsonschema import validate -from .config import INSTANCES_PATH, ADDONS_PATH, CONFIG_PATH +import jsonschema +from urllib.parse import urlparse, parse_qs +from contextlib import contextmanager +from redis.client import Redis +from redis.sentinel import Sentinel +from .config import VALKEY_URL logger = logging.getLogger(__name__) - REGISTRY_CONFIG_SUFFIX = '.config/helm/registry.json' REPOSITORY_CACHE_SUFFIX = '.cache/helm/repository' REPOSITORY_CONFIG_SUFFIX = '.config/helm/repository' -INSTANCE_META_SCHEMA = { - "type": "object", - "properties": { - "id": {"type": "string"}, - "details": { - "type": "object", - "properties": { - "service_id": {"type": "string"}, - "plan_id": {"type": "string"}, - "context": {"type": "object"}, - "parameters": { - 'oneOf': [{'type': 'object'}, {'type': 'null'}] - }, - }, - "required": [ - "service_id", "plan_id", "context" - ] - }, - "last_operation": { - "type": "object", - "properties": { - "state": {"type": "string"}, - "operation": {"type": "string"}, - "description": {"type": "string"} - } - }, - "last_modified_time": {"type": "number"} - }, -} -BINDING_META_SCHEMA = { - "type": "object", - "properties": { - "id": {"type": "string"}, - "credentials": { - "type": "object", - }, - "last_operation": { - "type": "object", - "properties": { - "state": {"type": "string"}, - "description": {"type": "string"} - } - }, - "last_modified_time": {"type": "number"} - } -} -ADDONS_META_SCHEMA = { - "type": "object", - "patternProperties": { - ".*": { - "id": {"type": "string"}, - "name": {"type": "string"}, - "version": {"type": "string"}, - "bindable": {"type": "boolean"}, - "instances_retrievable": {"type": "boolean"}, - "bindings_retrievable": {"type": "boolean"}, - "allow_context_updates": {"type": "boolean"}, - "description": {"type": "string"}, - "tags": {"type": "string"}, - "requires": {"type": "array"}, - "metadata": {"type": "object"}, - "plan_updateable": {"type": "boolean"}, - "dashboard_client": {"type": "object"}, - "plans": { - "type": "object", - "id": {"type": "string"}, - "name": {"type": "string"}, - "description": {"type": "string"}, - "metadata": {"type": "object"}, - "free": {"type": "boolean"}, - "bindable": {"type": "boolean"}, - "binding_rotatable": {"type": "boolean"}, - "plan_updateable": {"type": "boolean"}, - "schemas": {"type": "object"}, - "maximum_polling_duration": {"type": "integer"}, - "maintenance_info": {"type": "object"}, - "required": [ - "id", "name", "description" - ] - }, - "required": [ - "id", "name", "description", "bindable", "version", "plans" - ] - } - } -} def command(cmd, *args, output_type="text"): - status, output = subprocess.getstatusoutput("%s %s" % (cmd, " ".join(args))) # noqa + status, output = subprocess.getstatusoutput("%s %s" % (cmd, " ".join(args))) if output_type == "yaml": return yaml.load(output, Loader=yaml.Loader) elif output_type == "json": @@ -113,13 +27,8 @@ def command(cmd, *args, output_type="text"): return status, output -get_instance_path = lambda instance_id: os.path.join(INSTANCES_PATH, instance_id) # noqa -get_instance_file = lambda instance_id: os.path.join(get_instance_path(instance_id), "instance.json") # noqa -get_chart_path = lambda instance_id: os.path.join(get_instance_path(instance_id), "chart") # noqa -get_plan_path = lambda instance_id: os.path.join(get_instance_path(instance_id), "plan") # noqa - - def helm(instance_id, *args, output_type="text"): + from .database.query import get_instance_path instance_path = get_instance_path(instance_id) new_args = [] new_args.extend(args) @@ -134,145 +43,51 @@ def helm(instance_id, *args, output_type="text"): return command("helm", *new_args, output_type=output_type) -def load_instance_meta(instance_id): - file = get_instance_file(instance_id) - with open(file) as f: - data = json.loads(f.read()) - validate(instance=data, schema=INSTANCE_META_SCHEMA) - return data - - -def dump_instance_meta(instance_id, data): - data["last_modified_time"] = time.time() - file = get_instance_file(instance_id) - validate(instance=data, schema=INSTANCE_META_SCHEMA) - with open(file, "w") as f: - f.write(json.dumps(data, sort_keys=True, indent=2)) - - -def dump_raw_values(instance_id, data): - timestamp = time.time() - instance_path = get_instance_path(instance_id) - file = f"{instance_path}/raw-values-{timestamp}.yaml" - with open(file, "w") as f: - f.write(data) - return file - - -def dump_addon_values(service_id, instance_id): - timestamp = time.time() - instance_path = get_instance_path(instance_id) - file = f"{instance_path}/addon-values-{timestamp}.yaml" - service = _get_addon_meta(service_id) - logger.debug(f"dump_addon_values service: {service}") - if not os.path.exists(f'{CONFIG_PATH}/addon-values'): - return None - with open(file, "w") as fw: - with open(f'{CONFIG_PATH}/addon-values', 'r') as f: - addons_values = yaml.load(f.read(), Loader=yaml.Loader) - logger.debug(f"dump_addon_values addons_values: {addons_values}") - addon_values = addons_values.get(service["name"], {}).\ - get(service["version"], {}) - logger.debug(f"dump_addon_values addon_values: {addon_values}") - if not addon_values: - return None - fw.write(yaml.dump(addon_values)) - return file - - -def load_binding_meta(instance_id): - file = os.path.join(get_instance_path(instance_id), "binding.json") - with open(file, 'r') as f: - data = json.loads(f.read()) - validate(instance=data, schema=INSTANCE_META_SCHEMA) - return data - - -def dump_binding_meta(instance_id, data): - data["last_modified_time"] = time.time() - file = os.path.join(get_instance_path(instance_id), "binding.json") - validate(instance=data, schema=INSTANCE_META_SCHEMA) - with open(file, "w") as f: - f.write(json.dumps(data, sort_keys=True, indent=2)) - - -def load_addons_meta(): - file = os.path.join(ADDONS_PATH, "addons.json") - with open(file, 'r') as f: - data = json.loads(f.read()) - if not data: - return {} - validate(instance=data, schema=INSTANCE_META_SCHEMA) - return data - - -def dump_addons_meta(data): - file = os.path.join(ADDONS_PATH, "addons.json") - validate(instance=data, schema=INSTANCE_META_SCHEMA) - with open(file, "w") as f: - f.write(json.dumps(data, sort_keys=True, indent=2)) - - -def get_addon_path(service_id, plan_id): - service = _get_addon_meta(service_id) - plan = [plan for plan in service['plans'] if plan['id'] == plan_id][0] - plan_name = plan['name'] - service_name_path = f'{service["name"]}-{service["version"]}' - base_path = f"{ADDONS_PATH}/{service_name_path}" - service_path = f'{base_path}/chart/{service["name"]}' - plan_path = f'{base_path}/plans/{plan_name}' - return service_path, plan_path - - -def get_addon_updateable(service_id): - service = _get_addon_meta(service_id) - return service.get('plan_updateable', False) - - -def get_addon_bindable(service_id): - service = _get_addon_meta(service_id) - return service.get('bindable', False) - - -def get_addon_allow_paras(service_id): - service = _get_addon_meta(service_id) - return service.get('allow_parameters', []) - - -def get_addon_archive(service_id): - service = _get_addon_meta(service_id) - return service.get('archive', False) - - -def get_cred_value(ns, source): - if source.get('serviceRef'): - return _get_service_key_value(ns, source['serviceRef']) - if source.get('configMapRef'): - return _get_config_map_key_value(ns, source['configMapRef']) - if source.get('secretKeyRef'): - return _get_secret_key_value(ns, source['secretKeyRef']) - return -1, 'invalid valueFrom' - - -class InstanceLock(object): - - def __init__(self, instance_id): - self.instance_id = instance_id - - def __enter__(self): - self.fileno = open( - os.path.join(INSTANCES_PATH, self.instance_id, "instance.lock"), - "w" +def get_valkey_client(): + url = urlparse(VALKEY_URL) + query = parse_qs(url.query) + if 'master_set' in query: + user, host = url.netloc.split("@") + password = user.split(":")[1] + sentinel = Sentinel( + [host.split(":")], + sentinel_kwargs={'password': password}, + password=password, ) - fcntl.flock(self.fileno, fcntl.LOCK_EX) - return self - - def __exit__(self, exc_type, exc_value, traceback): - fcntl.flock(self.fileno, fcntl.LOCK_UN) - - def __del__(self): - if hasattr(self, "fileno"): - fcntl.flock(self.fileno, fcntl.LOCK_UN) + return sentinel.master_for(query['master_set'][0], socket_timeout=1) + return Redis.from_url(VALKEY_URL) + + +def new_instance_lock(instance_id): + return get_valkey_client().lock(instance_id) + + +@contextmanager +def run_instance_hooks(instance_id, stage): + if stage not in ["provision", "bind", "unbind", "update", "deprovision"]: + raise ValueError(f"Unknown stage {stage}") + from .database.query import get_hooks_path + from .database.savepoint import save_hooks_result + pre_script_file = os.path.join(get_hooks_path(instance_id), f"pre_{stage}.sh") + post_script_file = os.path.join(get_hooks_path(instance_id), f"post_{stage}.sh") + logger.debug(f"instance hook running: {instance_id}, {instance_id}") + result = [] + try: + if os.path.exists(pre_script_file): + status, output = subprocess.getstatusoutput(pre_script_file) + result.append({"script": pre_script_file, "status": status, "output": output}) + else: + status, output = 0, f"skip running {pre_script_file}" + logger.debug(output) + yield status, output + finally: + if os.path.exists(pre_script_file): + status, output = subprocess.getstatusoutput(post_script_file) + result.append({"script": pre_script_file, "status": status, "output": output}) + else: + logger.debug(f"skip running {post_script_file}") + save_hooks_result(instance_id, result) + logger.debug(f"instance hook completed: {instance_id}, {instance_id}") def verify_parameters(allow_parameters, parameters): @@ -295,15 +110,14 @@ def merge_parameters(parameters): ) -def format_paras_to_helm_args(instance_id, parameters, args): - """ - - """ +def format_params_to_helm_args(instance_id, parameters, args): + """format helm args""" + from .database.savepoint import save_raw_values params = copy.deepcopy(parameters) if params and "rawValues" in params \ and params.get("rawValues", ""): - values = str(base64.b64decode(params["rawValues"]), "utf-8") # noqa - raw_values_file = dump_raw_values(instance_id, values) + values = str(base64.b64decode(params["rawValues"]), "utf-8") + raw_values_file = save_raw_values(instance_id, values) args.extend(["-f", raw_values_file]) params.pop("rawValues") if params: @@ -312,37 +126,6 @@ def format_paras_to_helm_args(instance_id, parameters, args): return args -def _get_addon_meta(service_id): - services = load_addons_meta() - service = [addon for addon in [addons for _, addons in services.items()] - if addon['id'] == service_id][0] - return service - - -def _get_service_key_value(ns, service_ref): - args = [ - "get", "svc", service_ref['name'], "-n", ns, '-o', f"jsonpath=\'{service_ref['jsonpath']}\'", # noqa - ] - return command("kubectl", *args) - - -def _get_config_map_key_value(ns, config_map_ref): - args = [ - "get", "cm", config_map_ref['name'], "-n", ns, '-o', f"jsonpath=\'{config_map_ref['jsonpath']}\'", # noqa - ] - return command("kubectl", *args) - - -def _get_secret_key_value(ns, secret_ref): - args = [ - "get", "secret", secret_ref['name'], "-n", ns, '-o', f"jsonpath=\'{secret_ref['jsonpath']}\'", # noqa - ] - status, output = command("kubectl", *args) - if status == 0: - output = base64.b64decode(output).decode() - return status, output - - def _raw_values_format_keys(raw_values, prefix=''): """ {'a': {'b': 1, 'c': {'d': 2, 'e': 3}}, 'f': 4} @@ -386,3 +169,52 @@ def _verify_required_parameters(allow_parameters, parameters): if error: error_parameters.add(allow_parameter["name"]) return error_parameters + + +def verify_parameters_by_plan(instance_id, parameters): + """verify parameters allowed or not""" + if not parameters: + return "" + # read schema file + from .database.query import get_plan_schema_path + schema_file = get_plan_schema_path(instance_id) + try: + with open(schema_file, 'r') as f: + schema = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return "" + if not schema: + return "" + # get parameters + if "rawValues" in parameters: + params = yaml.safe_load(base64.b64decode(parameters["rawValues"])) + else: + params = _convert_to_nested_dict(parameters) + # validate schema + try: + jsonschema.validate(params, schema) + except jsonschema.ValidationError as e: + return f"could not validate: {e.message}" + return "" + + +def _convert_to_nested_dict(assignments): + """ + {"a.b.c": "1Gi", "a.b.d": "2Gi"} + -> + {'a': {'b': {'c': '1Gi', 'd': '2Gi'}}} + """ + def set_nested_value(d, keys, value): + if len(keys) == 1: + d[keys[0]] = value + else: + if keys[0] not in d: + d[keys[0]] = {} + set_nested_value(d[keys[0]], keys[1:], value) + result = {} + if isinstance(assignments, dict): + # dict format: {"a.b.c": "1Gi"} + for key_path, value in assignments.items(): + keys = key_path.split('.') + set_nested_value(result, keys, value) + return result diff --git a/rootfs/helmbroker/wsgi.py b/rootfs/helmbroker/wsgi.py index ac89884..206c711 100644 --- a/rootfs/helmbroker/wsgi.py +++ b/rootfs/helmbroker/wsgi.py @@ -1,4 +1,5 @@ import os +import logging from flask import Flask, make_response from openbrokerapi import api, log_util from helmbroker.broker import HelmServiceBroker @@ -17,8 +18,7 @@ def readiness(): if "KUBECONFIG" in os.environ: return "OK" elif "KUBERNETES_SERVICE_PORT" in os.environ and \ - ("KUBERNETES_SERVICE_HOST" in os.environ or - "KUBERNETES_CLUSTER_DOMAIN" in os.environ): + "KUBERNETES_SERVICE_HOST" in os.environ: return "OK" return make_response("kubernetes not available", 500) @@ -27,5 +27,5 @@ def readiness(): catalog_api = api.get_blueprint( HelmServiceBroker(), api.BrokerCredentials(USERNAME, PASSWORD), - log_util.basic_config()) + log_util.basic_config(level=logging.DEBUG if Config.DEBUG else logging.INFO)) application.register_blueprint(catalog_api) diff --git a/rootfs/requirements.txt b/rootfs/requirements.txt index 9a30c09..7810104 100644 --- a/rootfs/requirements.txt +++ b/rootfs/requirements.txt @@ -1,6 +1,7 @@ -PyYAML==6.0 -gunicorn==20.1.0 -openbrokerapi==4.5.5 -requests==2.31.0 -celery==5.3.1 -jsonschema==4.17.3 \ No newline at end of file +PyYAML==6.0.2 +gunicorn==23.0.0 +openbrokerapi==4.7.1 +requests==2.32.5 +celery==5.5.3 +redis==6.4.0 +jsonschema==4.25.1 diff --git a/rootfs/setup.cfg b/rootfs/setup.cfg new file mode 100644 index 0000000..e000ebb --- /dev/null +++ b/rootfs/setup.cfg @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 99 +exclude = api/migrations,templates,venv +max-complexity = 12 \ No newline at end of file