diff --git a/.woodpecker/build-linux.yml b/.woodpecker/build-linux.yml index 782d2f5..ef4ecfd 100644 --- a/.woodpecker/build-linux.yml +++ b/.woodpecker/build-linux.yml @@ -14,16 +14,22 @@ steps: - export VERSION=$([ -z $CI_COMMIT_TAG ] && echo latest || echo $CI_COMMIT_TAG)-$(sed 's#/#-#g' <<< $CI_SYSTEM_PLATFORM) - echo $CONTAINER_PASSWORD | podman login $DRYCC_REGISTRY --username $CONTAINER_USERNAME --password-stdin > /dev/null 2>&1 - make podman-build podman-immutable-push - secrets: - - codename - - dev_registry - - drycc_registry - - container_username - - container_password + environment: + CODENAME: + from_secret: codename + DEV_REGISTRY: + from_secret: dev_registry + DRYCC_REGISTRY: + from_secret: drycc_registry + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - push - tag + - cron depends_on: - test-linux \ No newline at end of file diff --git a/.woodpecker/chart.yaml b/.woodpecker/chart.yaml index 214841c..d40739c 100644 --- a/.woodpecker/chart.yaml +++ b/.woodpecker/chart.yaml @@ -11,18 +11,24 @@ steps: - export APP_VERSION=$([ -z $CI_COMMIT_TAG ] && echo $CI_COMMIT_SHA || echo $VERSION) - export CHART_VERSION=$([ -z $CI_COMMIT_TAG ] && echo 1.0.0 || echo $VERSION) - sed -i "s/imageTag:\ \"canary\"/imageTag:\ $IMAGE_TAG/g" charts/$${CI_REPO_NAME}/values.yaml + - sed -i s#{{repository}}#oci://$DRYCC_REGISTRY/$([ -z $CI_COMMIT_TAG ] && echo charts-testing || echo charts)#g charts/$${CI_REPO_NAME}/Chart.yaml - helm package -u charts/$${CI_REPO_NAME} --version $CHART_VERSION --app-version $APP_VERSION - echo $CONTAINER_PASSWORD | helm registry login $DRYCC_REGISTRY -u $CONTAINER_USERNAME --password-stdin - helm push $${CI_REPO_NAME}-$CHART_VERSION.tgz oci://$DRYCC_REGISTRY/$([ -z $CI_COMMIT_TAG ] && echo charts-testing || echo charts) - secrets: - - dev_registry - - drycc_registry - - container_username - - container_password + environment: + DEV_REGISTRY: + from_secret: dev_registry + DRYCC_REGISTRY: + from_secret: drycc_registry + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - push - tag + - cron depends_on: - manifest \ No newline at end of file diff --git a/.woodpecker/manifest.yml b/.woodpecker/manifest.yml index b4735ee..7330ea0 100644 --- a/.woodpecker/manifest.yml +++ b/.woodpecker/manifest.yml @@ -8,12 +8,14 @@ steps: commands: - sed -i "s/{{project}}/$${CI_REPO_NAME}/g" .woodpecker/manifest.tmpl - sed -i "s/registry.drycc.cc/$${DRYCC_REGISTRY}/g" .woodpecker/manifest.tmpl - secrets: - - drycc_registry + environment: + DRYCC_REGISTRY: + from_secret: drycc_registry when: event: - tag - push + - cron - name: publish-manifest image: bash @@ -26,13 +28,16 @@ steps: -v $(pwd):$(pwd) -w $(pwd) docker.io/plugins/manifest - secrets: - - container_username - - container_password + environment: + CONTAINER_USERNAME: + from_secret: container_username + CONTAINER_PASSWORD: + from_secret: container_password when: event: - tag - push + - cron depends_on: - build-linux diff --git a/.woodpecker/test-linux.yml b/.woodpecker/test-linux.yml index 16a01b5..ab8ee69 100644 --- a/.woodpecker/test-linux.yml +++ b/.woodpecker/test-linux.yml @@ -12,11 +12,15 @@ steps: image: bash commands: - make test upload-coverage - secrets: - - codename - - dev_registry - - codecov_token + environment: + CODENAME: + from_secret: codename + DEV_REGISTRY: + from_secret: dev_registry + CODECOV_TOKEN: + from_secret: codecov_token when: event: - push - tag + - cron diff --git a/charts/helmbroker/Chart.yaml b/charts/helmbroker/Chart.yaml index da3007f..6e8d2aa 100644 --- a/charts/helmbroker/Chart.yaml +++ b/charts/helmbroker/Chart.yaml @@ -6,12 +6,10 @@ dependencies: - name: common repository: oci://registry.drycc.cc/charts version: ~1.1.2 - - name: redis - repository: oci://registry.drycc.cc/charts - version: x.x.x - - name: rabbitmq - repository: oci://registry.drycc.cc/charts + - name: valkey + repository: {{repository}} version: x.x.x + condition: valkey.enabled description: Drycc Workflow helmbroker. maintainers: - name: Drycc Team diff --git a/charts/helmbroker/templates/_helpers.tpl b/charts/helmbroker/templates/_helpers.tpl index cc0c838..3e9e40f 100644 --- a/charts/helmbroker/templates/_helpers.tpl +++ b/charts/helmbroker/templates/_helpers.tpl @@ -7,39 +7,20 @@ env: value: {{ if .Values.username | default "" | ne "" }}{{ .Values.username }}{{ else }}{{ randAlphaNum 32 }}{{ end }} - name: HELMBROKER_PASSWORD value: {{ if .Values.password | default "" | ne "" }}{{ .Values.password }}{{ else }}{{ randAlphaNum 32 }}{{ end }} -{{- if (.Values.rabbitmqUrl) }} -- name: HELMBROKER_RABBITMQ_URL - value: {{ .Values.rabbitmqUrl }} -{{- else if eq .Values.global.rabbitmqLocation "on-cluster" }} -- name: "HELMBROKER_RABBITMQ_USERNAME" +{{- if (.Values.valkeyUrl) }} +- name: HELMBROKER_VALKEY_URL valueFrom: secretKeyRef: - name: rabbitmq-creds - key: username -- name: "HELMBROKER_RABBITMQ_PASSWORD" + name: helmbroker-creds + key: valkey-url +{{- else if .Values.valkey.enabled }} +- name: VALKEY_PASSWORD valueFrom: secretKeyRef: - name: rabbitmq-creds + name: valkey-creds key: password -- name: "HELMBROKER_RABBITMQ_URL" - value: "amqp://$(HELMBROKER_RABBITMQ_USERNAME):$(HELMBROKER_RABBITMQ_PASSWORD)@drycc-rabbitmq.{{$.Release.Namespace}}.svc.{{$.Values.global.clusterDomain}}:5672/helmbroker" -{{- end }} -{{- if (.Values.redisUrl) }} -- name: HELMBROKER_REDIS_URL - value: {{ .Values.redisUrl }} -{{- else if eq .Values.global.redisLocation "on-cluster" }} -- name: "HELMBROKER_REDIS_ADDRS" - valueFrom: - secretKeyRef: - name: redis-creds - key: addrs -- name: "HELMBROKER_REDIS_PASSWORD" - valueFrom: - secretKeyRef: - name: redis-creds - key: password -- name: "HELMBROKER_REDIS_URL" - value: "redis://:$(HELMBROKER_REDIS_PASSWORD)@$(HELMBROKER_REDIS_ADDRS)/0" +- name: HELMBROKER_VALKEY_URL + value: "redis://:$(VALKEY_PASSWORD)@drycc-valkey:26379/0?master_set=drycc" {{- end }} {{- range $key, $value := .Values.environment }} - name: {{ $key }} @@ -47,20 +28,6 @@ env: {{- end }} {{- end }} -{{/* Generate helmbroker deployment limits */}} -{{- define "helmbroker.limits" -}} -{{- if or (.Values.limitsCpu) (.Values.limitsMemory) }} -resources: - limits: -{{- if (.Values.limitsCpu) }} - cpu: {{.Values.limitsCpu}} -{{- end }} -{{- if (.Values.limitsMemory) }} - memory: {{.Values.limitsMemory}} -{{- end }} -{{- end }} -{{- end }} - {{/* Generate helmbroker deployment volumeMounts */}} {{- define "helmbroker.volumeMounts" }} volumeMounts: diff --git a/charts/helmbroker/templates/helmbroker-celery-deployment.yaml b/charts/helmbroker/templates/helmbroker-celery-deployment.yaml index 7388b2c..1b39b78 100644 --- a/charts/helmbroker/templates/helmbroker-celery-deployment.yaml +++ b/charts/helmbroker/templates/helmbroker-celery-deployment.yaml @@ -7,7 +7,7 @@ metadata: annotations: component.drycc.cc/version: {{ .Values.imageTag }} spec: - replicas: {{ .Values.celeryReplicas }} + replicas: {{ .Values.celery.replicas }} strategy: rollingUpdate: maxSurge: 1 @@ -33,11 +33,12 @@ spec: args: - netcat - -v - - -a - - $(DRYCC_HELMBROKER_SERVICE_HOST):$(DRYCC_HELMBROKER_SERVICE_PORT) + - -u + - http://drycc-helmbroker {{- include "helmbroker.envs" . | indent 10 }} containers: - - name: drycc-helmbroker-celery + {{- range $key := (list "low" "middle" "high") }} + - name: drycc-helmbroker-celery-{{$key}} image: {{$.Values.imageRegistry}}/{{$.Values.imageOrg}}/helmbroker:{{$.Values.imageTag}} imagePullPolicy: {{$.Values.imagePullPolicy}} {{- if $.Values.diagnosticMode.enabled }} @@ -47,9 +48,17 @@ spec: args: - /bin/bash - -c - - celery --app helmbroker worker --queues helmbroker.low,helmbroker.middle,helmbroker.high --autoscale=32,1 --loglevel=WARNING + - celery --app helmbroker worker -n {{uuidv4}}@%h --queues helmbroker.{{$key}} --autoscale=32,1 --loglevel=WARNING + {{- end }} + {{- with index $.Values "celery" "resources" }} + resources: + {{- toYaml . | nindent 10 }} {{- end }} - {{- include "helmbroker.limits" $ | indent 8 }} {{- include "helmbroker.envs" $ | indent 8 }} {{- include "helmbroker.volumeMounts" $ | indent 8 }} + {{- end }} {{- include "helmbroker.volumes" . | indent 6 }} + securityContext: + fsGroup: 1001 + runAsGroup: 1001 + runAsUser: 1001 diff --git a/charts/helmbroker/templates/helmbroker-deployment.yaml b/charts/helmbroker/templates/helmbroker-deployment.yaml index 7a711ed..45cc4cb 100644 --- a/charts/helmbroker/templates/helmbroker-deployment.yaml +++ b/charts/helmbroker/templates/helmbroker-deployment.yaml @@ -7,7 +7,7 @@ metadata: annotations: component.drycc.cc/version: {{ .Values.imageTag }} spec: - replicas: {{ .Values.replicas }} + replicas: {{ .Values.api.replicas }} strategy: rollingUpdate: maxSurge: 1 @@ -34,7 +34,7 @@ spec: - netcat - -v - -u - - $(HELMBROKER_REDIS_URL),$(HELMBROKER_RABBITMQ_URL) + - $(HELMBROKER_VALKEY_URL) {{- include "helmbroker.envs" . | indent 10 }} - name: drycc-helmbroker-fetch image: {{.Values.imageRegistry}}/{{.Values.imageOrg}}/helmbroker:{{.Values.imageTag}} @@ -71,7 +71,14 @@ spec: ports: - containerPort: 8000 name: http - {{- include "helmbroker.limits" . | indent 8 }} + {{- with index .Values "api" "resources" }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} {{- include "helmbroker.envs" . | indent 8 }} {{- include "helmbroker.volumeMounts" . | indent 8 }} {{- include "helmbroker.volumes" . | indent 6 }} + securityContext: + fsGroup: 1001 + runAsGroup: 1001 + runAsUser: 1001 diff --git a/charts/helmbroker/templates/helmbroker-secret-creds.yaml b/charts/helmbroker/templates/helmbroker-secret-creds.yaml new file mode 100644 index 0000000..29b1bc3 --- /dev/null +++ b/charts/helmbroker/templates/helmbroker-secret-creds.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Secret +metadata: + name: helmbroker-creds + labels: + heritage: drycc +data: + {{- if (.Values.valkeyUrl) }} + valkey-url: {{ .Values.valkeyUrl | b64enc }} + {{- end }} \ No newline at end of file diff --git a/charts/helmbroker/values.yaml b/charts/helmbroker/values.yaml index d8c5375..65b9f54 100644 --- a/charts/helmbroker/values.yaml +++ b/charts/helmbroker/values.yaml @@ -2,9 +2,7 @@ imageOrg: "drycc-addons" imageTag: "canary" imageRegistry: "registry.drycc.cc" imagePullPolicy: "Always" -replicas: 1 -# limitsCpu: "100m" -# limitsMemory: "50Mi" + ## Enable diagnostic mode ## @@ -23,21 +21,16 @@ diagnosticMode: ## config the helm-broker repositories repositories: -- name: drycc-helm-broker - url: https://github.com/drycc/addons/releases/download/latest/index.yaml - -celeryReplicas: 1 + - name: drycc-helm-broker + url: https://github.com/drycc/addons/releases/download/latest/index.yaml # broker_credentials: # Optional Usernames and passwords that will be required to communicate with service broker username: admin password: admin -# Configuring this will no longer use the built-in redis component -redisUrl: "" - -# Configuring this will no longer use the built-in rabbitmq component -rabbitmqUrl: "" +# Configuring this will no longer use the built-in valkey component +valkeyUrl: "" # Any custom controller environment variables # can be specified as key-value pairs under environment @@ -47,11 +40,19 @@ environment: # HELMBROKER_CONFIG_ROOT: /etc/helmbroker api: + replicas: 1 + resources: {} + # limits: + # cpu: 200m + # memory: 50Mi + # requests: + # cpu: 100m + # memory: 30Mi nodeAffinityPreset: key: "drycc.cc/node" type: "soft" values: - - "true" + - "true" podAffinityPreset: type: "" extraMatchLabels: @@ -62,11 +63,19 @@ api: app: "drycc-helmbroker" celery: + replicas: 1 + resources: {} + # limits: + # cpu: 200m + # memory: 50Mi + # requests: + # cpu: 100m + # memory: 30Mi nodeAffinityPreset: key: "drycc.cc/node" type: "soft" values: - - "true" + - "true" podAffinityPreset: type: "" extraMatchLabels: @@ -76,10 +85,6 @@ celery: extraMatchLabels: app: "drycc-helmbroker-celery" -# drycc redis replicas must always be set to 1 -redis: - replicas: 1 - # Default override of addon values addonValues: {} @@ -90,25 +95,5 @@ persistence: storageClass: "" volumeName: "" -global: - # Set the location of Workflow's redis instance - # Valid values are: - # - on-cluster: Run Redis within the Kubernetes cluster - # - off-cluster: Run Redis outside the Kubernetes cluster (configure in controller section) - redisLocation: "on-cluster" - # Set the location of Workflow's rabbitmq instance - # Valid values are: - # - on-cluster: Run Rabbitmq within the Kubernetes cluster - # - off-cluster: Run Rabbitmq outside the Kubernetes cluster (configure in controller section) - rabbitmqLocation: "on-cluster" - # Enable usage of RBAC authorization mode - # - # Valid values are: - # - true: all RBAC-related manifests will be installed (in case your cluster supports RBAC) - # - false: no RBAC-related manifests will be installed - rbac: true - # A domain name consists of one or more parts. - # Periods (.) are used to separate these parts. - # Each part must be 1 to 63 characters in length and can contain lowercase letters, digits, and hyphens (-). - # It must start and end with a lowercase letter or digit. - clusterDomain: "cluster.local" +valkey: + enabled: true diff --git a/rootfs/Dockerfile b/rootfs/Dockerfile index e97938a..85d3134 100644 --- a/rootfs/Dockerfile +++ b/rootfs/Dockerfile @@ -4,9 +4,9 @@ FROM registry.drycc.cc/drycc/base:${CODENAME} ENV DRYCC_UID=1001 \ DRYCC_GID=1001 \ DRYCC_HOME_DIR=/workspace \ - PYTHON_VERSION="3.12" \ - HELM_VERSION="3.15.3" \ - KUBECTL_VERSION="1.30.3" + PYTHON_VERSION="3.14" \ + HELM_VERSION="4.1.3" \ + KUBECTL_VERSION="1.35.3" RUN groupadd drycc --gid ${DRYCC_GID} \ && useradd drycc -u ${DRYCC_UID} -g ${DRYCC_GID} -s /bin/bash -m -d ${DRYCC_HOME_DIR} diff --git a/rootfs/Dockerfile.test b/rootfs/Dockerfile.test index b5c1d34..fe48d53 100644 --- a/rootfs/Dockerfile.test +++ b/rootfs/Dockerfile.test @@ -4,9 +4,9 @@ FROM registry.drycc.cc/drycc/base:${CODENAME} ENV DRYCC_UID=1001 \ DRYCC_GID=1001 \ DRYCC_HOME_DIR=/workspace \ - PYTHON_VERSION="3.12" \ - HELM_VERSION="3.15.3" \ - KUBECTL_VERSION="1.30.3" + PYTHON_VERSION="3.14" \ + HELM_VERSION="4.1.3" \ + KUBECTL_VERSION="1.35.3" RUN groupadd drycc --gid ${DRYCC_GID} \ && useradd drycc -u ${DRYCC_UID} -g ${DRYCC_GID} -s /bin/bash -m -d ${DRYCC_HOME_DIR} diff --git a/rootfs/helmbroker/broker.py b/rootfs/helmbroker/broker.py index 8b38269..c920789 100644 --- a/rootfs/helmbroker/broker.py +++ b/rootfs/helmbroker/broker.py @@ -1,6 +1,6 @@ import os -import shutil import logging +import time from typing import Union, List, Optional from openbrokerapi.catalog import ServicePlan @@ -13,13 +13,15 @@ UpdateDetails, UpdateServiceSpec, DeprovisionDetails, \ DeprovisionServiceSpec, LastOperation, OperationState -from .utils import verify_parameters, new_instance_lock +from .utils import verify_parameters, new_instance_lock, verify_parameters_by_plan from .database.fetch import fetch_chart_plan from .database.query import get_instance_path, get_chart_path, get_plan_path, \ get_addon_updateable, get_addon_bindable, get_addon_allow_params, \ get_addon_archive, get_binding_file, get_instance_file -from .database.metadata import load_instance_meta, load_binding_meta, load_addons_meta +from .database.metadata import load_instance_meta, load_binding_meta, load_addons_meta, \ + save_instance_meta from .tasks import provision, bind, deprovision, update, unbind +from .config import INSTANCE_NAME_LENS logger = logging.getLogger(__name__) @@ -42,6 +44,11 @@ def provision(self, details: ProvisionDetails, async_allowed: bool, **kwargs) -> ProvisionedServiceSpec: + logger.debug(f"*** provision instance {instance_id}") + # verify instance_name length + if len(details.context["instance_name"]) > INSTANCE_NAME_LENS: + raise ErrBadRequest( + msg=f"The length of the instance name cannot exceed {INSTANCE_NAME_LENS}.") instance_path = get_instance_path(instance_id) if os.path.exists(instance_path): raise ErrInstanceAlreadyExists() @@ -62,6 +69,10 @@ def provision(self, os.makedirs(instance_path, exist_ok=True) chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) fetch_chart_plan(details.service_id, chart_path, details.plan_id, plan_path) + # verify instance-schema + msg = verify_parameters_by_plan(instance_id, details.parameters) + if msg: + raise ErrBadRequest(msg) provision.delay(instance_id, details) return ProvisionedServiceSpec(state=ProvisionState.IS_ASYNC) @@ -82,6 +93,7 @@ def bind(self, async_allowed: bool, **kwargs ) -> Binding: + logger.debug(f"*** bind instance {instance_id}") is_addon_bindable = get_addon_bindable(details.service_id) if not is_addon_bindable: raise ErrBadRequest( @@ -94,9 +106,6 @@ def bind(self, instance_path = get_instance_path(instance_id) if os.path.exists(f'{instance_path}/bind.json'): raise ErrBindingAlreadyExists() - chart_path, plan_path = ( - get_chart_path(instance_id), get_plan_path(instance_id)) - shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/templates') bind(instance_id, binding_id, details, async_allowed, **kwargs) data = load_binding_meta(instance_id) if data["last_operation"]["state"] == OperationState.SUCCEEDED.value: @@ -112,7 +121,7 @@ def unbind(self, async_allowed: bool, **kwargs ) -> UnbindSpec: - logger.debug(f"unbind instance {instance_id}") + logger.debug(f"*** unbind instance {instance_id}") unbind.delay(instance_id) return UnbindSpec(is_async=True) @@ -122,6 +131,7 @@ def update(self, async_allowed: bool, **kwargs ) -> UpdateServiceSpec: + logger.debug(f"*** update instance {instance_id}") instance_path = get_instance_path(instance_id) if not os.path.exists(instance_path): raise ErrBadRequest(msg="Instance %s does not exist" % instance_id) @@ -145,6 +155,15 @@ def update(self, if details.plan_id is not None: chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) fetch_chart_plan(details.service_id, chart_path, details.plan_id, plan_path) + # verify instance-schema + msg = verify_parameters_by_plan(instance_id, details.parameters) + if msg: + raise ErrBadRequest(msg) + data = load_instance_meta(instance_id) + data['last_operation']["state"] = OperationState.IN_PROGRESS.value + data['last_operation']["description"] = ( + f"update {instance_id} in progress at {time.time()}") + save_instance_meta(instance_id, data) update.delay(instance_id, details) return UpdateServiceSpec(is_async=True) @@ -153,6 +172,7 @@ def deprovision(self, details: DeprovisionDetails, async_allowed: bool, **kwargs) -> DeprovisionServiceSpec: + logger.debug(f"*** deprovision instance {instance_id}") if not os.path.exists(get_instance_path(instance_id)): raise ErrInstanceDoesNotExist() with new_instance_lock(instance_id): @@ -172,6 +192,7 @@ def last_operation(self, operation_data: Optional[str], **kwargs ) -> LastOperation: + logger.debug(f"*** last_operation instance {instance_id}") if os.path.exists(get_instance_file(instance_id)): data = load_instance_meta(instance_id) return LastOperation( @@ -186,6 +207,7 @@ def last_binding_operation(self, operation_data: Optional[str], **kwargs ) -> LastOperation: + logger.debug(f"*** last_binding_operation instance {instance_id}") if os.path.exists(get_binding_file(instance_id)): data = load_binding_meta(instance_id) return LastOperation( diff --git a/rootfs/helmbroker/celery.py b/rootfs/helmbroker/celery.py index 67f810f..45d4181 100644 --- a/rootfs/helmbroker/celery.py +++ b/rootfs/helmbroker/celery.py @@ -1,28 +1,32 @@ import os +from urllib.parse import urlparse, parse_qs, urlencode from kombu import Exchange, Queue from celery import Celery +from .config import VALKEY_URL class Config(object): # Celery Configuration Options - timezone = "Asia/Shanghai" enable_utc = True task_serializer = 'pickle' accept_content = frozenset([ - 'application/data', - 'application/text', - 'application/json', - 'application/x-python-serialize', + 'application/data', + 'application/text', + 'application/json', + 'application/x-python-serialize', ]) task_track_started = True task_time_limit = 30 * 60 worker_max_tasks_per_child = 200 + worker_prefetch_multiplier = 1 result_expires = 24 * 60 * 60 - broker_url = os.environ.get("HELMBROKER_RABBITMQ_URL", 'amqp://guest:guest@127.0.0.1:5672/') - broker_connection_retry_on_startup = True - task_default_queue = 'helmbroker.low' + cache_backend = 'django-cache' + task_default_queue = 'helmbroker.middle' task_default_exchange = 'helmbroker.priority' - task_default_routing_key = 'helmbroker.priority.low' + task_default_routing_key = 'helmbroker.priority.middle' + broker_transport_options = {"queue_order_strategy": "sorted"} + task_create_missing_queues = True + task_inherit_parent_priority = True broker_connection_retry_on_startup = True worker_cancel_long_running_tasks_on_connection_loss = True @@ -30,6 +34,7 @@ class Config(object): app = Celery('helmbroker') app.config_from_object(Config()) app.conf.update( + timezone=os.environ.get('TZ', 'UTC'), task_routes={ 'helmbroker.tasks.provision': { 'queue': 'helmbroker.high', @@ -51,22 +56,36 @@ class Config(object): task_queues=( Queue( 'helmbroker.low', exchange=Exchange('helmbroker.priority', type="direct"), - routing_key='helmbroker.priority.low', queue_arguments={'x-queue-type': 'quorum'}, + routing_key='helmbroker.priority.low', ), Queue( 'helmbroker.high', exchange=Exchange('helmbroker.priority', type="direct"), - routing_key='helmbroker.priority.high', queue_arguments={'x-queue-type': 'quorum'}, + routing_key='helmbroker.priority.high', ), Queue( 'helmbroker.middle', exchange=Exchange('helmbroker.priority', type="direct"), - routing_key='helmbroker.priority.middle', queue_arguments={'x-queue-type': 'quorum'}, + routing_key='helmbroker.priority.middle', ), ), ) app.autodiscover_tasks(("helmbroker.tasks",)) - - -app.config_from_object(Config()) +url = urlparse(VALKEY_URL) +query = parse_qs(url.query) +broker_transport_options = {"queue_order_strategy": "sorted", "visibility_timeout": 43200} +result_backend_transport_options = {} +if 'master_set' in query: + master_name = query.pop('master_set')[0] + password = url.netloc.split("@")[0].split(":")[1] + kwargs = {'sentinel_kwargs': {'password': password}, 'master_name': master_name} + broker_transport_options.update(kwargs) + result_backend_transport_options.update(kwargs) + VALKEY_URL = f"sentinel://{url.netloc}{url.path}?{urlencode(query)}" +app.conf.update( + broker_url=VALKEY_URL, + result_backend=VALKEY_URL, + broker_transport_options=broker_transport_options, + result_backend_transport_options=result_backend_transport_options, +) if __name__ == '__main__': app.start() diff --git a/rootfs/helmbroker/config.py b/rootfs/helmbroker/config.py index 6def522..c83ad94 100644 --- a/rootfs/helmbroker/config.py +++ b/rootfs/helmbroker/config.py @@ -10,8 +10,9 @@ USERNAME = os.environ.get('HELMBROKER_USERNAME') PASSWORD = os.environ.get('HELMBROKER_PASSWORD') -REDIS_URL = os.environ.get("HELMBROKER_REDIS_URL", 'redis://localhost:6379/0') +VALKEY_URL = os.environ.get("HELMBROKER_VALKEY_URL", 'redis://localhost:6379/0') +INSTANCE_NAME_LENS = int(os.environ.get("INSTANCE_NAME_LENS", '32')) class Config: - DEBUG = bool(os.environ.get('HELMBROKER_DEBUG', True)) + DEBUG = bool(os.environ.get('HELMBROKER_DEBUG', False)) diff --git a/rootfs/helmbroker/database/fetch.py b/rootfs/helmbroker/database/fetch.py index 4fe252f..d2d8102 100644 --- a/rootfs/helmbroker/database/fetch.py +++ b/rootfs/helmbroker/database/fetch.py @@ -58,7 +58,7 @@ def _fetch_addon(url, dest): os.makedirs(dest, exist_ok=True) with tarfile.open(fileobj=tgz_file, mode="r:gz") as tarobj: for tarinfo in tarobj: - tarobj.extract(tarinfo.name, dest) + tarobj.extract(tarinfo.name, dest, filter='data') filename1 = os.path.join(dest, "meta.yaml") with open(filename1, "r") as f1: meta = yaml.load(stream=f1, Loader=yaml.Loader) diff --git a/rootfs/helmbroker/database/metadata.py b/rootfs/helmbroker/database/metadata.py index 6b37041..2877aaa 100644 --- a/rootfs/helmbroker/database/metadata.py +++ b/rootfs/helmbroker/database/metadata.py @@ -4,8 +4,8 @@ import logging import jsonschema -from redis import client -from ..config import ADDONS_PATH, REDIS_URL +from ..utils import get_valkey_client +from ..config import ADDONS_PATH logger = logging.getLogger(__name__) @@ -116,21 +116,20 @@ def save_instance_meta(instance_id, data): json_data = json.dumps(data, sort_keys=True, indent=2) with open(file, "w") as f: f.write(json_data) - redis = client.Redis.from_url(REDIS_URL) - redis.set(cache_key, json_data) + get_valkey_client().set(cache_key, json_data) def load_instance_meta(instance_id): cache_key = f"helmbroker:instance:{instance_id}" - redis = client.Redis.from_url(REDIS_URL) + valkey = get_valkey_client() - json_data = redis.get(cache_key) + json_data = valkey.get(cache_key) if not json_data: from .query import get_instance_file file = get_instance_file(instance_id) with open(file) as f: json_data = f.read() - redis.set(cache_key, json_data) + valkey.set(cache_key, json_data) return json.loads(json_data) @@ -144,20 +143,19 @@ def save_binding_meta(instance_id, data): json_data = json.dumps(data, sort_keys=True, indent=2) with open(file, "w") as f: f.write(json_data) - redis = client.Redis.from_url(REDIS_URL) - redis.set(cache_key, json_data) + get_valkey_client().set(cache_key, json_data) def load_binding_meta(instance_id): from .query import get_binding_file cache_key = f"helmbroker:binding:{instance_id}" - redis = client.Redis.from_url(REDIS_URL) - json_data = redis.get(cache_key) + valkey = get_valkey_client() + json_data = valkey.get(cache_key) if not json_data: file = get_binding_file(instance_id) with open(file, 'r') as f: json_data = f.read() - redis.set(cache_key, json_data) + valkey.set(cache_key, json_data) return json.loads(json_data) @@ -170,18 +168,17 @@ def save_addons_meta(data): json_data = json.dumps(data, sort_keys=True, indent=2) with open(file, "w") as f: f.write(json_data) - redis = client.Redis.from_url(REDIS_URL) - redis.set(cache_key, json_data) + get_valkey_client().set(cache_key, json_data) def load_addons_meta(): cache_key = "helmbroker:addons" - redis = client.Redis.from_url(REDIS_URL) + valkey = get_valkey_client() - json_data = redis.get(cache_key) + json_data = valkey.get(cache_key) if not json_data: file = os.path.join(ADDONS_PATH, "addons.json") with open(file, 'r') as f: json_data = f.read() - redis.set(cache_key, json_data) + valkey.set(cache_key, json_data) return json.loads(json_data) diff --git a/rootfs/helmbroker/database/query.py b/rootfs/helmbroker/database/query.py index b7aa331..741cfc3 100644 --- a/rootfs/helmbroker/database/query.py +++ b/rootfs/helmbroker/database/query.py @@ -22,6 +22,10 @@ def get_plan_path(instance_id): return os.path.join(get_instance_path(instance_id), "plan") +def get_plan_schema_path(instance_id): + return os.path.join(get_instance_path(instance_id), "plan", "instance-schema.json") + + def get_hooks_path(instance_id): return os.path.join(get_plan_path(instance_id), "hooks") diff --git a/rootfs/helmbroker/gunicorn/logging.py b/rootfs/helmbroker/gunicorn/logging.py index f9e65f3..41a3f9e 100644 --- a/rootfs/helmbroker/gunicorn/logging.py +++ b/rootfs/helmbroker/gunicorn/logging.py @@ -1,12 +1,12 @@ -import os from gunicorn.glogging import Logger +from helmbroker.config import Config class Logging(Logger): def access(self, resp, req, environ, request_time): # health check endpoints are only logged in debug mode if ( - not os.environ.get('DEBUG', False) and + not Config.DEBUG and req.path in ['/readiness', '/healthz'] ): return diff --git a/rootfs/helmbroker/tasks.py b/rootfs/helmbroker/tasks.py index 8f19af7..69f2f30 100644 --- a/rootfs/helmbroker/tasks.py +++ b/rootfs/helmbroker/tasks.py @@ -2,6 +2,7 @@ import time import yaml import logging +import shutil from openbrokerapi.service_broker import ProvisionDetails, OperationState, \ UpdateDetails, BindDetails @@ -19,10 +20,12 @@ @app.task(serializer='pickle') def provision(instance_id: str, details: ProvisionDetails): + logger.debug(f"*** task provision instance: {instance_id}, before lock") with ( new_instance_lock(instance_id), run_instance_hooks(instance_id, "provision") as (status, output) ): + logger.debug(f"*** task provision instance: {instance_id}") backup_instance(instance_id) # create instance.json data = { @@ -77,10 +80,12 @@ def provision(instance_id: str, details: ProvisionDetails): @app.task(serializer='pickle') def update(instance_id: str, details: UpdateDetails): + logger.debug(f"*** task update instance: {instance_id}, before lock") with ( new_instance_lock(instance_id), run_instance_hooks(instance_id, "update") as (status, output) ): + logger.debug(f"*** task update instance: {instance_id}") backup_instance(instance_id) data = load_instance_meta(instance_id) if details.service_id: @@ -99,10 +104,6 @@ def update(instance_id: str, details: UpdateDetails): data["last_operation"]["description"] = f"update {instance_id} failed: {output}" save_instance_meta(instance_id, data) return - data['last_operation']["state"] = OperationState.IN_PROGRESS.value - data['last_operation']["description"] = ( - f"update {instance_id} in progress at {time.time()}") - save_instance_meta(instance_id, data) chart_path = get_chart_path(instance_id) values_file = os.path.join(get_plan_path(instance_id), "values.yaml") args = [ @@ -136,10 +137,12 @@ def bind(instance_id: str, details: BindDetails, async_allowed: bool, **kwargs): + logger.debug(f"*** task bind instance: {instance_id}, before lock") with ( new_instance_lock(instance_id), run_instance_hooks(instance_id, "bind") as (status, output) ): + logger.debug(f"*** task bind instance: {instance_id}") backup_instance(instance_id) data = {"binding_id": binding_id, "credentials": {}, "last_operation": {}} if status != 0: @@ -148,7 +151,9 @@ def bind(instance_id: str, save_binding_meta(instance_id, data) return save_binding_meta(instance_id, data) - chart_path = get_chart_path(instance_id) + chart_path, plan_path = ( + get_chart_path(instance_id), get_plan_path(instance_id)) + shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/templates') values_file = os.path.join(get_plan_path(instance_id), "values.yaml") args = [ "template", details.context["instance_name"], chart_path, @@ -200,10 +205,12 @@ def bind(instance_id: str, @app.task(serializer='pickle') def unbind(instance_id): + logger.debug(f"*** task unbind instance: {instance_id}, before lock") with ( new_instance_lock(instance_id), - run_instance_hooks(instance_id, "deprovision") as (status, output) + run_instance_hooks(instance_id, "unbind") as (status, output) ): + logger.debug(f"*** task unbind instance: {instance_id}") backup_instance(instance_id) data = load_binding_meta(instance_id) if status != 0: @@ -221,10 +228,12 @@ def unbind(instance_id): @app.task(serializer='pickle') def deprovision(instance_id: str): + logger.debug(f"*** task deprovision instance: {instance_id}, before lock") with ( new_instance_lock(instance_id), run_instance_hooks(instance_id, "deprovision") as (status, output) ): + logger.debug(f"*** task deprovision instance: {instance_id}") backup_instance(instance_id) data = load_instance_meta(instance_id) if status != 0: diff --git a/rootfs/helmbroker/utils.py b/rootfs/helmbroker/utils.py index ff19967..e749e85 100644 --- a/rootfs/helmbroker/utils.py +++ b/rootfs/helmbroker/utils.py @@ -5,13 +5,14 @@ import base64 import copy import logging - +import jsonschema +from urllib.parse import urlparse, parse_qs from contextlib import contextmanager -from redis import client -from .config import REDIS_URL +from redis.client import Redis +from redis.sentinel import Sentinel +from .config import VALKEY_URL logger = logging.getLogger(__name__) - REGISTRY_CONFIG_SUFFIX = '.config/helm/registry.json' REPOSITORY_CACHE_SUFFIX = '.cache/helm/repository' REPOSITORY_CONFIG_SUFFIX = '.config/helm/repository' @@ -42,9 +43,23 @@ def helm(instance_id, *args, output_type="text"): return command("helm", *new_args, output_type=output_type) +def get_valkey_client(): + url = urlparse(VALKEY_URL) + query = parse_qs(url.query) + if 'master_set' in query: + user, host = url.netloc.split("@") + password = user.split(":")[1] + sentinel = Sentinel( + [host.split(":")], + sentinel_kwargs={'password': password}, + password=password, + ) + return sentinel.master_for(query['master_set'][0], socket_timeout=1) + return Redis.from_url(VALKEY_URL) + + def new_instance_lock(instance_id): - redis = client.Redis.from_url(REDIS_URL) - return redis.lock(instance_id) + return get_valkey_client().lock(instance_id) @contextmanager @@ -154,3 +169,52 @@ def _verify_required_parameters(allow_parameters, parameters): if error: error_parameters.add(allow_parameter["name"]) return error_parameters + + +def verify_parameters_by_plan(instance_id, parameters): + """verify parameters allowed or not""" + if not parameters: + return "" + # read schema file + from .database.query import get_plan_schema_path + schema_file = get_plan_schema_path(instance_id) + try: + with open(schema_file, 'r') as f: + schema = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return "" + if not schema: + return "" + # get parameters + if "rawValues" in parameters: + params = yaml.safe_load(base64.b64decode(parameters["rawValues"])) + else: + params = _convert_to_nested_dict(parameters) + # validate schema + try: + jsonschema.validate(params, schema) + except jsonschema.ValidationError as e: + return f"could not validate: {e.message}" + return "" + + +def _convert_to_nested_dict(assignments): + """ + {"a.b.c": "1Gi", "a.b.d": "2Gi"} + -> + {'a': {'b': {'c': '1Gi', 'd': '2Gi'}}} + """ + def set_nested_value(d, keys, value): + if len(keys) == 1: + d[keys[0]] = value + else: + if keys[0] not in d: + d[keys[0]] = {} + set_nested_value(d[keys[0]], keys[1:], value) + result = {} + if isinstance(assignments, dict): + # dict format: {"a.b.c": "1Gi"} + for key_path, value in assignments.items(): + keys = key_path.split('.') + set_nested_value(result, keys, value) + return result diff --git a/rootfs/helmbroker/wsgi.py b/rootfs/helmbroker/wsgi.py index ac89884..206c711 100644 --- a/rootfs/helmbroker/wsgi.py +++ b/rootfs/helmbroker/wsgi.py @@ -1,4 +1,5 @@ import os +import logging from flask import Flask, make_response from openbrokerapi import api, log_util from helmbroker.broker import HelmServiceBroker @@ -17,8 +18,7 @@ def readiness(): if "KUBECONFIG" in os.environ: return "OK" elif "KUBERNETES_SERVICE_PORT" in os.environ and \ - ("KUBERNETES_SERVICE_HOST" in os.environ or - "KUBERNETES_CLUSTER_DOMAIN" in os.environ): + "KUBERNETES_SERVICE_HOST" in os.environ: return "OK" return make_response("kubernetes not available", 500) @@ -27,5 +27,5 @@ def readiness(): catalog_api = api.get_blueprint( HelmServiceBroker(), api.BrokerCredentials(USERNAME, PASSWORD), - log_util.basic_config()) + log_util.basic_config(level=logging.DEBUG if Config.DEBUG else logging.INFO)) application.register_blueprint(catalog_api) diff --git a/rootfs/requirements.txt b/rootfs/requirements.txt index c889eb3..7810104 100644 --- a/rootfs/requirements.txt +++ b/rootfs/requirements.txt @@ -1,7 +1,7 @@ -PyYAML==6.0.1 -gunicorn==22.0.0 -openbrokerapi==4.6.0 -requests==2.31.0 -celery==5.5.0b3 -redis==5.0.8 -jsonschema==4.21.1 +PyYAML==6.0.2 +gunicorn==23.0.0 +openbrokerapi==4.7.1 +requests==2.32.5 +celery==5.5.3 +redis==6.4.0 +jsonschema==4.25.1