Skip to content

Commit 1e60359

Browse files
committed
feat(controller): add drycc annotation to Kubernetes
1 parent 01d6d6c commit 1e60359

15 files changed

Lines changed: 162 additions & 133 deletions

File tree

rootfs/api/models/app.py

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def save(self, *args, **kwargs):
133133
self.release_set.latest()
134134
except Release.DoesNotExist:
135135
try:
136-
if self.scheduler().ns.get(self.id).status_code == 200:
136+
if self.scheduler.ns.get(self.id).status_code == 200:
137137
# Namespace already exists
138138
err = "{} already exists as a namespace in this kuberenetes setup".format(self.id) # noqa
139139
self.log(err, logging.INFO)
@@ -155,6 +155,17 @@ def lock(self):
155155
def ptypes(self):
156156
return list(self.structure.keys())
157157

158+
@property
159+
def scheduler(self):
160+
"""
161+
Override @Base.AuditedModel.scheduler;
162+
since the app itself doesn't have an app object context,
163+
directly reference using ID instead.
164+
"""
165+
scheduler = super(App, self).scheduler
166+
scheduler.metadata["annotations"]["drycc.cc/project_id"] = str(self.id)
167+
return scheduler
168+
158169
def check_ptypes(self, ptypes: set):
159170
"""
160171
check available procfile types
@@ -199,10 +210,10 @@ def create(self, *args, **kwargs): # noqa
199210
self.log('creating Namespace {} and services'.format(namespace), level=logging.DEBUG)
200211
# Create essential resources
201212
try:
202-
self.scheduler().ns.get(namespace)
213+
self.scheduler.ns.get(namespace)
203214
except KubeException:
204215
try:
205-
self.scheduler().ns.create(namespace)
216+
self.scheduler.ns.create(namespace)
206217
except KubeException as e:
207218
raise ServiceUnavailable('Could not create the Namespace in Kubernetes') from e
208219
try:
@@ -220,15 +231,15 @@ def delete(self, *args, **kwargs):
220231
self.log("deleting environment")
221232
try:
222233
# check if namespace exists
223-
self.scheduler().ns.get(self.id)
234+
self.scheduler.ns.get(self.id)
224235

225236
try:
226-
self.scheduler().ns.delete(self.id)
237+
self.scheduler.ns.delete(self.id)
227238

228239
# wait 30 seconds for termination
229240
for _ in range(30):
230241
try:
231-
self.scheduler().ns.get(self.id)
242+
self.scheduler.ns.get(self.id)
232243
except KubeHTTPException as e:
233244
# only break out on a 404
234245
if e.response.status_code == 404:
@@ -254,7 +265,7 @@ def restart(self, **kwargs): # noqa
254265
tasks = [
255266
(
256267
functools.partial(
257-
self.scheduler().deployment.restart,
268+
self.scheduler.deployment.restart,
258269
self.id,
259270
deployment
260271
),
@@ -293,7 +304,7 @@ def pipeline(self, release, ptypes, force_deploy=False):
293304
envs=self._build_env_vars(release, run['ptype']),
294305
)
295306
state, labels = 'initializing', {'job-name': job_name}
296-
for count, state in enumerate(self.scheduler().pod.watch(
307+
for count, state in enumerate(self.scheduler.pod.watch(
297308
self.id, labels, settings.DRYCC_PILELINE_RUN_TIMEOUT)):
298309
self.log(f"{prefix} waiting for pipeline.run: {state} * {count}")
299310
if state != 'down':
@@ -371,7 +382,7 @@ def clean(self, release=None, ptypes=None):
371382
labels = {'heritage': 'drycc'}
372383
if ptypes:
373384
labels["type__in"] = ptypes
374-
resource_apis = [self.scheduler().deployments, self.scheduler().secret]
385+
resource_apis = [self.scheduler.deployments, self.scheduler.secret]
375386
for api in resource_apis:
376387
resources = api.get(self.id, labels=labels).json()["items"]
377388
if resources is not None:
@@ -414,7 +425,7 @@ def pod_name(size=5, chars=string.ascii_lowercase + string.digits):
414425
try:
415426
# create application config and build the pod manifest
416427
self.set_application_config(release, PTYPE_RUN)
417-
self.scheduler().job.create(self.id, name, image, command, args, **kwargs)
428+
self.scheduler.job.create(self.id, name, image, command, args, **kwargs)
418429
except Exception as e:
419430
err = '{} ({}): {}'.format(name, PTYPE_RUN, e)
420431
raise ServiceUnavailable(err) from e
@@ -431,7 +442,7 @@ def get_command_and_args(pod, container_name):
431442
return command, args
432443
result = []
433444
try:
434-
pod = self.scheduler().pod.get(self.id, pod_name).json()
445+
pod = self.scheduler.pod.get(self.id, pod_name).json()
435446
if pod["status"]['phase'] == 'Pending':
436447
statuses = pod["spec"]["containers"]
437448
else:
@@ -462,9 +473,9 @@ def list_pods(self, *args, **kwargs):
462473
labels = self._scheduler_filter(**kwargs)
463474
# in case a singular pod is requested
464475
if 'name' in kwargs:
465-
pods = [self.scheduler().pod.get(self.id, kwargs['name']).json()]
476+
pods = [self.scheduler.pod.get(self.id, kwargs['name']).json()]
466477
else:
467-
pods = self.scheduler().pod.get(self.id, labels=labels).json()['items']
478+
pods = self.scheduler.pod.get(self.id, labels=labels).json()['items']
468479
if not pods:
469480
pods = []
470481
data = []
@@ -475,7 +486,7 @@ def list_pods(self, *args, **kwargs):
475486
else:
476487
started = str(
477488
datetime.now(timezone.utc).strftime(settings.DRYCC_DATETIME_FORMAT))
478-
state = str(self.scheduler().pod.state(p))
489+
state = str(self.scheduler.pod.state(p))
479490
if p['status']['phase'] != 'Pending':
480491
ready = len([1 for s in p["status"]["containerStatuses"] if s['ready']])
481492
restarts = sum([s['restartCount'] for s in p["status"]["containerStatuses"]])
@@ -508,9 +519,9 @@ def delete_pod(self, **kwargs):
508519
pod_name = kwargs.get('pod_name')
509520
try:
510521
# make sure the pod is manageed by drycc
511-
pod = self.scheduler().pod.get(self.id, pod_name).json()
522+
pod = self.scheduler.pod.get(self.id, pod_name).json()
512523
if pod['metadata']['labels'].get("heritage") == "drycc":
513-
self.scheduler().pod.delete(self.id, pod_name)
524+
self.scheduler.pod.delete(self.id, pod_name)
514525
except KubeHTTPException as e:
515526
# Sometimes k8s will manage to remove the pod from under us
516527
if e.response.status_code != 404:
@@ -519,7 +530,7 @@ def delete_pod(self, **kwargs):
519530
def describe_deployment(self, deployment_name):
520531
result = []
521532
try:
522-
deployment = self.scheduler().deployment.get(self.id, deployment_name).json()
533+
deployment = self.scheduler.deployment.get(self.id, deployment_name).json()
523534
for container in deployment["spec"]["template"]['spec']["containers"]:
524535
limits = container.get("resources", {}).get("limits", {})
525536
result.append({
@@ -545,9 +556,9 @@ def list_deployments(self, *args, **kwargs):
545556
labels = self._scheduler_filter(**kwargs)
546557
# in case a singular deployment is requested
547558
if 'name' in kwargs:
548-
deployments = [self.scheduler().deployment.get(self.id, kwargs['name']).json()]
559+
deployments = [self.scheduler.deployment.get(self.id, kwargs['name']).json()]
549560
else:
550-
deployments = self.scheduler().deployment.get(self.id, labels=labels).json()['items'] # noqa
561+
deployments = self.scheduler.deployment.get(self.id, labels=labels).json()['items'] # noqa
551562
if not deployments:
552563
deployments = []
553564
data = []
@@ -588,7 +599,7 @@ def list_events(self, ref_kind, ref_name, *args, **kwargs):
588599
"regarding.name": ref_name
589600
}
590601
kwargs["fields"] = fields
591-
events = self.scheduler().events.get(self.id, **kwargs).json()['items'] # noqa
602+
events = self.scheduler.events.get(self.id, **kwargs).json()['items'] # noqa
592603
data = []
593604
for e in events:
594605
item = {
@@ -622,16 +633,16 @@ def autoscale(self, proc_type, autoscale):
622633

623634
try:
624635
# get the target for autoscaler, in this case Deployment
625-
self.scheduler().hpa.get(self.id, name)
636+
self.scheduler.hpa.get(self.id, name)
626637
if autoscale is None:
627-
self.scheduler().hpa.delete(self.id, name)
638+
self.scheduler.hpa.delete(self.id, name)
628639
else:
629-
self.scheduler().hpa.update(
640+
self.scheduler.hpa.update(
630641
self.id, name, proc_type, target, **autoscale
631642
)
632643
except KubeHTTPException as e:
633644
if e.response.status_code == 404:
634-
self.scheduler().hpa.create(
645+
self.scheduler.hpa.create(
635646
self.id, name, proc_type, target, **autoscale
636647
)
637648
else:
@@ -649,16 +660,16 @@ def image_pull_secret(self, namespace, ptype, registry, image):
649660
elif create:
650661
data = {'.dockerconfigjson': docker_config}
651662
try:
652-
self.scheduler().secret.get(namespace, name)
663+
self.scheduler.secret.get(namespace, name)
653664
except KubeHTTPException:
654-
self.scheduler().secret.create(
665+
self.scheduler.secret.create(
655666
namespace,
656667
name,
657668
data,
658669
secret_type='kubernetes.io/dockerconfigjson'
659670
)
660671
else:
661-
self.scheduler().secret.update(
672+
self.scheduler.secret.update(
662673
namespace,
663674
name,
664675
data,
@@ -674,7 +685,7 @@ def state_to_k8s(self):
674685
return
675686
ptypes = set()
676687
for ptype, scale in self.structure.items():
677-
response = self.scheduler().deployment.get(
688+
response = self.scheduler.deployment.get(
678689
self.id, self._get_deployment_name(ptype),
679690
ignore_exception=True)
680691
if response.status_code == 404 and scale > 0:
@@ -710,11 +721,11 @@ def set_application_config(self, release, ptype):
710721

711722
secret_name = "{}-{}-{}-env".format(self.id, ptype, release.version_name)
712723
try:
713-
self.scheduler().secret.get(self.id, secret_name)
724+
self.scheduler.secret.get(self.id, secret_name)
714725
except KubeHTTPException:
715-
self.scheduler().secret.create(self.id, secret_name, secrets_env, labels=labels)
726+
self.scheduler.secret.create(self.id, secret_name, secrets_env, labels=labels)
716727
else:
717-
self.scheduler().secret.update(self.id, secret_name, secrets_env, labels=labels)
728+
self.scheduler.secret.update(self.id, secret_name, secrets_env, labels=labels)
718729

719730
def to_measurements(self, timestamp: float):
720731
measurements = []
@@ -766,15 +777,15 @@ def _mount(self, user, volume, app_settings, structure=None):
766777
volume for volume in volumes if scale_type in volume.path.keys()]
767778
data = self._gather_app_settings(
768779
release, app_settings, scale_type, replicas, volumes=scale_type_volumes)
769-
deployment = self.scheduler().deployment.get(
780+
deployment = self.scheduler.deployment.get(
770781
self.id, self._get_deployment_name(scale_type)).json()
771782
spec_annotations = deployment['spec']['template']['metadata'].get(
772783
'annotations', {})
773784
self.set_application_config(release, scale_type)
774785
# gather volume proc types to be deployed
775786
tasks.append((
776787
functools.partial(
777-
self.scheduler().deployment.patch,
788+
self.scheduler.deployment.patch,
778789
namespace=self.id,
779790
name=self._get_deployment_name(scale_type),
780791
image=release.get_deploy_image(scale_type),
@@ -808,7 +819,7 @@ def _deploy(self, deploys, ptypes, prev_release,
808819
self.set_application_config(release, scale_type)
809820
tasks.append((
810821
functools.partial(
811-
self.scheduler().deploy,
822+
self.scheduler.deploy,
812823
namespace=self.id,
813824
name=self._get_deployment_name(scale_type),
814825
image=release.get_deploy_image(scale_type),
@@ -906,7 +917,7 @@ def _scale_pods(self, scale_types, release, app_settings):
906917
# gather all proc types to be deployed
907918
tasks.append((
908919
functools.partial(
909-
self.scheduler().scale,
920+
self.scheduler.scale,
910921
namespace=self.id,
911922
name=self._get_deployment_name(scale_type),
912923
image=release.get_deploy_image(scale_type),
@@ -1064,7 +1075,7 @@ def _check_deployment_in_progress(self, deploys, force_deploy=False):
10641075
for scale_type, kwargs in deploys.items():
10651076
name = self._get_deployment_name(scale_type)
10661077
# Is there an existing deployment in progress?
1067-
in_progress, deploy_okay = self.scheduler().deployment.in_progress(
1078+
in_progress, deploy_okay = self.scheduler.deployment.in_progress(
10681079
self.id, name, kwargs.get("deploy_timeout"), kwargs.get("deploy_batches"),
10691080
kwargs.get("replicas"), kwargs.get("tags")
10701081
)
@@ -1148,7 +1159,7 @@ def _get_private_registry_config(self, ptype, image, registry=None):
11481159
username = registry.get('username')
11491160
password = registry.get('password')
11501161
elif settings.REGISTRY_LOCATION == 'off-cluster':
1151-
secret = self.scheduler().secret.get(
1162+
secret = self.scheduler.secret.get(
11521163
settings.WORKFLOW_NAMESPACE, 'controller-creds').json()
11531164
hostname = secret['data']['registry-host']
11541165
if hostname == '':

rootfs/api/models/base.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import uuid
22
import string
33
import random
4-
import importlib
54
from datetime import timedelta
65
from functools import partial
76
from django.db import models
@@ -11,7 +10,7 @@
1110
from django.utils.translation import gettext_lazy as _
1211

1312

14-
from api.utils import validate_json
13+
from api.utils import validate_json, get_scheduler
1514

1615
token_manager_oauth_schema = {
1716
"$schema": "http://json-schema.org/schema#",
@@ -48,10 +47,14 @@ class Meta:
4847
"""Mark :class:`AuditedModel` as abstract."""
4948
abstract = True
5049

51-
@classmethod
52-
def scheduler(cls):
53-
mod = importlib.import_module(settings.SCHEDULER_MODULE)
54-
return mod.SchedulerClient(settings.SCHEDULER_URL, settings.K8S_API_VERIFY_TLS)
50+
@property
51+
def scheduler(self):
52+
annotations = {}
53+
if hasattr(self, 'app'):
54+
annotations["drycc.cc/project_id"] = str(self.app.id)
55+
if hasattr(self, 'owner'):
56+
annotations["drycc.cc/account_id"] = str(self.owner.id)
57+
return get_scheduler(metadata={"annotations": annotations})
5558

5659

5760
class UuidAuditedModel(AuditedModel):

rootfs/api/models/certificate.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,14 +207,14 @@ def attach_in_kubernetes(self, domain):
207207
'tls.key': self.key
208208
}
209209

210-
secret = self.scheduler().secret.get(namespace, self.certname).json()['data']
210+
secret = self.scheduler.secret.get(namespace, self.certname).json()['data']
211211
except KubeException:
212-
self.scheduler().secret.create(namespace, self.certname, data)
212+
self.scheduler.secret.create(namespace, self.certname, data)
213213
else:
214214
# update cert secret to the TLS Ingress format if required
215215
if secret != data:
216216
try:
217-
self.scheduler().secret.update(namespace, self.certname, data)
217+
self.scheduler.secret.update(namespace, self.certname, data)
218218
except KubeException as e:
219219
msg = 'There was a problem updating the certificate secret ' \
220220
'{} for {}'.format(self.certname, namespace)
@@ -232,8 +232,8 @@ def detach(self, *args, **kwargs):
232232
if len(self.domains) == 0:
233233
try:
234234
# We raise an exception when a secret doesn't exist
235-
self.scheduler().secret.get(namespace, self.certname)
236-
self.scheduler().secret.delete(namespace, self.certname)
235+
self.scheduler.secret.get(namespace, self.certname)
236+
self.scheduler.secret.delete(namespace, self.certname)
237237
except KubeException as e:
238238
raise ServiceUnavailable(
239239
"Could not delete certificate secret {} for application {}".format(

rootfs/api/models/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def _update_tags(self, previous_config, replace_ptypes=[]):
236236
'{} does not exist under {}'.format(ptype, 'tags'))
237237
data.pop(ptype)
238238
else:
239-
if not self.scheduler().node.get(labels=values).json()['items']:
239+
if not self.scheduler.node.get(labels=values).json()['items']:
240240
labels = ['{}={}'.format(key, value) for key, value in values.items()]
241241
message = 'No nodes matched the provided labels: {}'.format(', '.join(labels))
242242
# Find out if there are any other tags around

0 commit comments

Comments
 (0)