Skip to content

Commit e92a2fb

Browse files
committed
chore(celery): remove retrieve_resource task
1 parent 7851c0c commit e92a2fb

17 files changed

Lines changed: 1563 additions & 88 deletions

charts/controller/templates/controller-webhook-register.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ webhooks:
3636
apiGroups: ["apps"]
3737
apiVersions: ["*"]
3838
resources: ["deployments/scale"]
39+
- operations: ["UPDATE"]
40+
apiGroups: ["servicecatalog.k8s.io"]
41+
apiVersions: ["*"]
42+
resources: ["serviceinstances/status", "servicebindings/status"]
3943
timeoutSeconds: 30
4044
---
4145
apiVersion: v1

rootfs/api/admissions.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,51 @@ def handle(self, request: Request) -> bool:
8181
)
8282
)
8383
return True
84+
85+
86+
class ServiceInstancesStatusHandler(BaseHandler):
87+
88+
def detect(self, request: Request) -> bool:
89+
group = request.get("resource", {}).get("group", None)
90+
resource = "/".join([
91+
request.get("resource", {}).get("resource", None),
92+
request.get("subResource", ""),
93+
])
94+
if (group, resource) == ("servicecatalog.k8s.io", "serviceinstances/status"):
95+
return True
96+
return False
97+
98+
def handle(self, request: Request) -> bool:
99+
app_id = request["object"]["metadata"]["namespace"]
100+
name = request["object"]["metadata"]["name"]
101+
status = request["object"]["status"]["lastConditionState"]
102+
resource = models.resource.Resource.objects.filter(
103+
app__id=app_id, name=name).first()
104+
if resource and resource.status != status:
105+
resource.status = status
106+
resource.save(update_fields=["status"])
107+
return True
108+
109+
110+
class ServicebindingsStatusHandler(BaseHandler):
111+
112+
def detect(self, request: Request) -> bool:
113+
group = request.get("resource", {}).get("group", None)
114+
resource = "/".join([
115+
request.get("resource", {}).get("resource", None),
116+
request.get("subResource", ""),
117+
])
118+
if (group, resource) == ("servicecatalog.k8s.io", "servicebindings/status"):
119+
return True
120+
return False
121+
122+
def handle(self, request: Request) -> bool:
123+
app_id = request["object"]["metadata"]["namespace"]
124+
name = request["object"]["metadata"]["name"]
125+
binding = request["object"]["status"]["lastConditionState"]
126+
resource = models.resource.Resource.objects.filter(
127+
app__id=app_id, name=name).first()
128+
if resource and resource.binding != binding:
129+
resource.binding = binding
130+
resource.save(update_fields=["binding"])
131+
return True

rootfs/api/management/commands/measure_loadbalancers.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import uuid
22
import time
33
import logging
4+
import ipaddress
45
from django.utils import timezone
56
from django.core.management.base import BaseCommand
67
from django.conf import settings
@@ -14,21 +15,31 @@
1415
class Command(BaseCommand):
1516
"""Management command for push data to manager"""
1617

18+
def _get_measure_name(self, ip):
19+
address = ipaddress.ip_address(ip)
20+
prefix = "intranet" if address.is_private else "internet"
21+
return f"{prefix}:{address.version}"
22+
1723
def _measure_loadbalancers(self, app_map, timestamp):
1824
stop = timestamp - (timestamp % 3600)
1925
start = stop - 3600
2026
loadbalancers = []
2127
for item in monitor.query_loadbalancer(app_map.keys(), start, stop):
22-
name = item["ip"]
28+
ip = item["ip"]
2329
namespace = item["namespace"]
2430
owner_id = app_map[namespace].owner_id
2531
loadbalancers.append({
2632
"app_id": str(app_map[namespace].uuid),
2733
"owner": owner_id,
28-
"name": name,
34+
"name": self._get_measure_name(ip),
2935
"type": "loadbalancer",
3036
"unit": "number",
3137
"usage": 1,
38+
"kwargs": {
39+
"ip": ip,
40+
"service": item["service"],
41+
"hostname": item["hostname"],
42+
},
3243
"timestamp": start
3344
})
3445
send_measurements.delay(loadbalancers)

rootfs/api/management/commands/measure_networks.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@ def _measure_networks(self, app_map, timestamp):
2121
for namespace, pod_name, rx_bytes, tx_bytes in monitor.query_network_flow(
2222
app_map.keys(), start, stop):
2323
owner_id = app_map[namespace].owner_id
24-
networks.append({
24+
base_measure = {
2525
"app_id": str(app_map[namespace].uuid),
2626
"owner": owner_id,
27-
"name": pod_name,
2827
"type": "network",
2928
"unit": "bytes",
30-
"usage": rx_bytes + tx_bytes,
29+
"kwargs": {
30+
"pod": pod_name,
31+
},
3132
"timestamp": start
32-
})
33+
}
34+
networks.append(base_measure | {"name": "rx", "usage": rx_bytes})
35+
networks.append(base_measure | {"name": "tx", "usage": tx_bytes})
3336
send_measurements.delay(networks)
3437

3538
def handle(self, *args, **options):

rootfs/api/models/app.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from api.utils import get_session
2424
from api.exceptions import AlreadyExists, DryccException, ServiceUnavailable
25-
from api.utils import generate_app_name, apply_tasks, unit_to_bytes, unit_to_millicpu
25+
from api.utils import generate_app_name, apply_tasks
2626
from scheduler import KubeHTTPException, KubeException
2727
from scheduler.resources.pod import DEFAULT_CONTAINER_PORT
2828
from .gateway import Gateway, Route, DEFAULT_HTTP_PORT, DEFAULT_HTTPS_PORT
@@ -589,23 +589,18 @@ def to_measurements(self, timestamp: float):
589589
measurements = []
590590
config = self.config_set.latest()
591591
for container_type, scale in self.structure.items():
592+
plan = config.limits.get(container_type)
592593
measurements.append({
593594
"app_id": str(self.uuid),
594595
"owner": self.owner_id,
595-
"name": container_type,
596-
"type": "cpu",
597-
"unit": "milli",
598-
"usage": unit_to_millicpu(config.cpu.get(container_type)) * scale,
599-
"timestamp": int(timestamp)
600-
})
601-
measurements.append({
602-
"app_id": str(self.uuid),
603-
"owner": self.owner_id,
604-
"name": container_type,
605-
"type": "memory",
606-
"unit": "bytes",
607-
"usage": unit_to_bytes(config.memory.get(container_type)) * scale,
608-
"timestamp": int(timestamp)
596+
"name": plan,
597+
"type": "limits",
598+
"unit": "number",
599+
"usage": scale,
600+
"kwargs": {
601+
"ptype": container_type,
602+
},
603+
"timestamp": int(timestamp),
609604
})
610605
return measurements
611606

rootfs/api/models/resource.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,13 @@ def to_measurements(self, timestamp: float):
201201
return [{
202202
"app_id": str(self.app_id),
203203
"owner": self.owner_id,
204-
"name": self.name,
205-
"type": self.plan,
204+
"name": self.plan,
205+
"type": "resource",
206206
"unit": "number",
207207
"usage": 1,
208+
"kwargs": {
209+
"name": self.name,
210+
},
208211
"timestamp": int(timestamp)
209212
}]
210213

rootfs/api/models/volume.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,13 @@ def to_measurements(self, timestamp: float):
7979
return [{
8080
"app_id": str(self.app_id),
8181
"owner": self.owner_id,
82-
"name": self.name,
82+
"name": self.type,
8383
"type": "volume",
8484
"unit": "bytes",
85-
"usage": unit_to_bytes(self.size) if self.type == "csi" else 0,
85+
"usage": unit_to_bytes(self.size),
86+
"kwargs": {
87+
"name": self.name,
88+
},
8689
"timestamp": int(timestamp)
8790
}]
8891

rootfs/api/settings/celery.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,6 @@ class Config(object):
4646
'exchange': 'controller.priority',
4747
'routing_key': 'controller.priority.high',
4848
},
49-
'api.tasks.retrieve_resource': {
50-
'queue': 'high',
51-
'exchange': 'controller.priority',
52-
'routing_key': 'controller.priority.high',
53-
},
5449
'api.tasks.downstream_model_owner': {
5550
'queue': 'high',
5651
'exchange': 'controller.priority',

rootfs/api/signals.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@
99
import logging
1010
import urllib.parse
1111
import requests
12-
from datetime import timedelta
1312
from django.conf import settings
1413
from django.db.models.signals import post_delete, post_save
15-
from django.utils.timezone import now
1614
from django.dispatch import receiver
1715
from django.contrib.auth import get_user_model
1816
from rest_framework.authtoken.models import Token
1917
from api.utils import get_session
20-
from api.tasks import retrieve_resource, send_measurements
18+
from api.tasks import send_measurements
2119
from api.models.app import App
2220
from api.models.service import Service
2321
from api.models.gateway import Gateway, DEFAULT_HTTPS_PORT
@@ -252,13 +250,6 @@ def volume_changed_handle(sender, instance: Volume, created=False, update_fields
252250
@receiver(post_save, sender=Resource)
253251
def resource_changed_handle(
254252
sender, instance: Resource, created=False, update_fields=None, **kwargs):
255-
# retrieve_resource
256-
if created or instance.binding == "Binding" or (
257-
update_fields is not None and "plan" in update_fields):
258-
retrieve_resource.apply_async(
259-
args=(instance, ),
260-
eta=now() + timedelta(seconds=30)
261-
)
262253
# measure resources to workflow manager
263254
if settings.WORKFLOW_MANAGER_URL and (
264255
created or (

rootfs/api/tasks.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# Create your tasks here
2-
import time
32
import uuid
43
import logging
54
from typing import List, Dict
@@ -8,33 +7,9 @@
87

98
from api import manager, models
109
from api.exceptions import ServiceUnavailable
11-
from api.models.resource import Resource
1210
logger = logging.getLogger(__name__)
1311

1412

15-
@shared_task(
16-
retry_kwargs={'max_retries': None}
17-
)
18-
def retrieve_resource(resource):
19-
task_id = uuid.uuid4().hex
20-
signals.request_started.send(sender=task_id)
21-
try:
22-
if not resource.retrieve():
23-
t = time.time() - resource.created.timestamp()
24-
if t < 3600:
25-
retrieve_resource.apply_async(args=(resource, ), countdown=30)
26-
elif t < 3600 * 12:
27-
retrieve_resource.apply_async(args=(resource, ), countdown=1800)
28-
except (Exception, Resource.DoesNotExist) as e:
29-
signals.got_request_exception.send(sender=task_id)
30-
if isinstance(e, Resource.DoesNotExist):
31-
logger.exception("retrieve task not found resource: {}".format(resource.id))
32-
else:
33-
raise e
34-
else:
35-
signals.request_finished.send(sender=task_id)
36-
37-
3813
@shared_task(
3914
autoretry_for=(Exception, ),
4015
retry_backoff=8,

0 commit comments

Comments
 (0)