|
| 1 | +import logging |
| 2 | +import functools |
1 | 3 | from django.db import models |
2 | 4 | from django.contrib.auth import get_user_model |
| 5 | +from scheduler import KubeHTTPException |
| 6 | +from api.exceptions import ServiceUnavailable |
| 7 | +from api.utils import apply_tasks |
3 | 8 | from .app import App |
4 | 9 | from .base import UuidAuditedModel |
5 | 10 |
|
| 11 | + |
6 | 12 | User = get_user_model() |
| 13 | +logger = logging.getLogger(__name__) |
7 | 14 |
|
8 | 15 |
|
9 | 16 | class Blocklist(UuidAuditedModel): |
@@ -41,3 +48,227 @@ def get_blocklist(cls, app: App): |
41 | 48 | class Meta: |
42 | 49 | ordering = ['-created'] |
43 | 50 | unique_together = (("id", "type"),) |
| 51 | + |
| 52 | + def related_resource_deployments(self, app: App): |
| 53 | + "get resource deployments" |
| 54 | + try: |
| 55 | + deployments = self.scheduler.deployment.get(app.id, labels={}).json()['items'] # noqa |
| 56 | + if not deployments: |
| 57 | + deployments = [] |
| 58 | + data = [] |
| 59 | + for d in deployments: |
| 60 | + item = { |
| 61 | + 'name': d['metadata']['name'], |
| 62 | + 'replicas': d['spec'].get("replicas", 0), |
| 63 | + } |
| 64 | + data.append(item) |
| 65 | + data.sort(key=lambda x: x['name']) |
| 66 | + return data |
| 67 | + except KubeHTTPException: |
| 68 | + pass |
| 69 | + except Exception as e: |
| 70 | + err = f'(list resource deployments): {e}' |
| 71 | + logger.info(err) |
| 72 | + raise ServiceUnavailable(err) from e |
| 73 | + |
| 74 | + def related_resource_statefulsets(self, app: App): |
| 75 | + "get resource statefulsets" |
| 76 | + try: |
| 77 | + statefulsets = self.scheduler.statefulset.get(app.id, labels={}).json()['items'] # noqa |
| 78 | + if not statefulsets: |
| 79 | + statefulsets = [] |
| 80 | + data = [] |
| 81 | + for s in statefulsets: |
| 82 | + item = { |
| 83 | + 'name': s['metadata']['name'], |
| 84 | + 'replicas': s['spec'].get("replicas", 0), |
| 85 | + } |
| 86 | + data.append(item) |
| 87 | + data.sort(key=lambda x: x['name']) |
| 88 | + return data |
| 89 | + except KubeHTTPException: |
| 90 | + pass |
| 91 | + except Exception as e: |
| 92 | + err = f'(list resource statefulsets): {e}' |
| 93 | + logger.info(err) |
| 94 | + raise ServiceUnavailable(err) from e |
| 95 | + |
| 96 | + def related_resource_daemonsets(self, app: App): |
| 97 | + "get resource daemonsets" |
| 98 | + try: |
| 99 | + daemonsets = self.scheduler.daemonset.get(app.id, labels={}).json()['items'] # noqa |
| 100 | + if not daemonsets: |
| 101 | + daemonsets = [] |
| 102 | + data = [] |
| 103 | + for d in daemonsets: |
| 104 | + item = { |
| 105 | + 'name': d['metadata']['name'], |
| 106 | + 'affinity': d['spec']['template']['spec'].get('affinity', {}), |
| 107 | + } |
| 108 | + data.append(item) |
| 109 | + data.sort(key=lambda x: x['name']) |
| 110 | + return data |
| 111 | + except KubeHTTPException: |
| 112 | + pass |
| 113 | + except Exception as e: |
| 114 | + err = f'(list resource daemonsets): {e}' |
| 115 | + logger.info(err) |
| 116 | + raise ServiceUnavailable(err) from e |
| 117 | + |
| 118 | + def suspended_state(self, app: App): |
| 119 | + """ |
| 120 | + Get deployments/statefulsets/daemonsets with labels app.kubernetes.io/managed-by=Helm |
| 121 | + Store in the suspended_state field of the app model |
| 122 | + Format: |
| 123 | + {"deployments": [{"name":"sample1", "replicas": 1}], |
| 124 | + "statefulsets": [{"name":"sample1", "replicas": 1}], |
| 125 | + "daemonsets": [{"name":"sample1", "affinity": {}}]} |
| 126 | + """ |
| 127 | + suspended_state = {} |
| 128 | + suspended_state['deployments'] = self.related_resource_deployments(app) |
| 129 | + suspended_state['statefulsets'] = self.related_resource_statefulsets(app) |
| 130 | + suspended_state['daemonsets'] = self.related_resource_daemonsets(app) |
| 131 | + return suspended_state |
| 132 | + |
| 133 | + def scale_resource_deployments(self, app: App, deployment_name: str, replicas=0): |
| 134 | + """scale deployments""" |
| 135 | + try: |
| 136 | + deployment = self.scheduler.deployment.get(app.id, deployment_name).json() |
| 137 | + self.scheduler.scales.update(app.id, deployment_name, replicas, deployment) |
| 138 | + except KubeHTTPException: |
| 139 | + pass |
| 140 | + except Exception as e: |
| 141 | + err = f'(scale resource deployments): {e}' |
| 142 | + logger.info(err) |
| 143 | + raise ServiceUnavailable(err) from e |
| 144 | + |
| 145 | + def scale_resource_statefulsets(self, app: App, statefulset_name: str, replicas=0): |
| 146 | + """scale statefulsets""" |
| 147 | + try: |
| 148 | + self.scheduler.statefulset.get(app.id, statefulset_name).json() |
| 149 | + manifest = { |
| 150 | + 'spec': { |
| 151 | + 'persistentVolumeClaimRetentionPolicy': { |
| 152 | + 'whenScaled': 'Retain' |
| 153 | + }, |
| 154 | + 'replicas': replicas |
| 155 | + } |
| 156 | + } |
| 157 | + self.scheduler.statefulset.patch(app.id, statefulset_name, manifest) |
| 158 | + except KubeHTTPException: |
| 159 | + pass |
| 160 | + except Exception as e: |
| 161 | + err = f'(scale resource statefulsets): {e}' |
| 162 | + logger.info(err) |
| 163 | + raise ServiceUnavailable(err) from e |
| 164 | + |
| 165 | + def scale_resource_daemonsets(self, app: App, daemonset_name: str, manifest: dict): |
| 166 | + """set affinity""" |
| 167 | + try: |
| 168 | + self.scheduler.daemonset.get(app.id, daemonset_name).json() |
| 169 | + self.scheduler.daemonset.patch(app.id, daemonset_name, manifest) |
| 170 | + except KubeHTTPException: |
| 171 | + pass |
| 172 | + except Exception as e: |
| 173 | + err = f'(scale resource daemonsets): {e}' |
| 174 | + logger.info(err) |
| 175 | + raise ServiceUnavailable(err) from e |
| 176 | + |
| 177 | + def scale_resources(self, app: App, suspended_state: dict, scale_type="block"): |
| 178 | + "scale resources tasks" |
| 179 | + tasks = [] |
| 180 | + deployments = suspended_state.get('deployments', []) |
| 181 | + if not deployments: |
| 182 | + deployments = [] |
| 183 | + for d in deployments: |
| 184 | + if scale_type == "unblock": |
| 185 | + replicas = d['replicas'] |
| 186 | + else: |
| 187 | + replicas = 0 |
| 188 | + tasks.append(( |
| 189 | + functools.partial( |
| 190 | + self.scale_resource_deployments, |
| 191 | + app=app, |
| 192 | + deployment_name=d['name'], |
| 193 | + replicas=replicas |
| 194 | + ), |
| 195 | + lambda future, name=d["name"]: app.log( |
| 196 | + f'{scale_type} scale deployment {name} callback: {future.result()}', |
| 197 | + ) |
| 198 | + )) |
| 199 | + |
| 200 | + statefulsets = suspended_state.get('statefulsets', []) |
| 201 | + if not statefulsets: |
| 202 | + statefulsets = [] |
| 203 | + for s in statefulsets: |
| 204 | + if scale_type == "unblock": |
| 205 | + replicas = s['replicas'] |
| 206 | + else: |
| 207 | + replicas = 0 |
| 208 | + tasks.append(( |
| 209 | + functools.partial( |
| 210 | + self.scale_resource_statefulsets, |
| 211 | + app=app, |
| 212 | + statefulset_name=s['name'], |
| 213 | + replicas=replicas |
| 214 | + ), |
| 215 | + lambda future, name=s["name"]: app.log( |
| 216 | + f'{scale_type} scale statefulset {name} callback: {future.result()}', |
| 217 | + ) |
| 218 | + )) |
| 219 | + |
| 220 | + daemonsets = suspended_state.get('daemonsets', []) |
| 221 | + if not daemonsets: |
| 222 | + daemonsets = [] |
| 223 | + for d in daemonsets: |
| 224 | + if scale_type == "unblock": |
| 225 | + manifest = { |
| 226 | + "spec": { |
| 227 | + "template": { |
| 228 | + "spec": { |
| 229 | + "affinity": d['affinity'] |
| 230 | + } |
| 231 | + } |
| 232 | + } |
| 233 | + } |
| 234 | + else: |
| 235 | + manifest = { |
| 236 | + "spec": { |
| 237 | + "template": { |
| 238 | + "spec": { |
| 239 | + "affinity": { |
| 240 | + "nodeAffinity": { |
| 241 | + "requiredDuringSchedulingIgnoredDuringExecution": { |
| 242 | + "nodeSelectorTerms": [{ |
| 243 | + "matchExpressions": [{ |
| 244 | + "key": "kubernetes.io/hostname", |
| 245 | + "operator": "In", |
| 246 | + "values": [ |
| 247 | + "nohostname" |
| 248 | + ] |
| 249 | + }] |
| 250 | + }] |
| 251 | + } |
| 252 | + } |
| 253 | + } |
| 254 | + } |
| 255 | + } |
| 256 | + } |
| 257 | + } |
| 258 | + tasks.append(( |
| 259 | + functools.partial( |
| 260 | + self.scale_resource_daemonsets, |
| 261 | + app=app, |
| 262 | + daemonset_name=d['name'], |
| 263 | + manifest=manifest |
| 264 | + ), |
| 265 | + lambda future, name=d["name"]: app.log( |
| 266 | + f'{scale_type} scale daemonset {name} callback: {future.result()}', |
| 267 | + ) |
| 268 | + )) |
| 269 | + try: |
| 270 | + apply_tasks(tasks) |
| 271 | + except Exception as e: |
| 272 | + err = f'({scale_type} scale resources): {e}' |
| 273 | + logger.info(err) |
| 274 | + raise ServiceUnavailable(err) from e |
0 commit comments