|
3 | 3 | import logging |
4 | 4 | import random |
5 | 5 | from datetime import timedelta |
| 6 | +from asgiref.sync import async_to_sync |
6 | 7 | from django.utils import timezone |
7 | 8 | from django.core.management.base import BaseCommand |
8 | 9 | from django.conf import settings |
9 | | -from api.models.volume import Volume |
| 10 | +from api import monitor |
| 11 | +from api.models.app import App |
10 | 12 | from api.tasks import send_usage |
11 | 13 |
|
12 | 14 | logger = logging.getLogger(__name__) |
|
15 | 17 | class Command(BaseCommand): |
16 | 18 | """Management command for push data to manager""" |
17 | 19 |
|
| 20 | + def _upload_volume_usage(self, start_time, app_map, timestamp): |
| 21 | + stop = timestamp - (timestamp % 3600) |
| 22 | + start = stop - 3600 |
| 23 | + volumes = [] |
| 24 | + for item in async_to_sync(monitor.query_volume_size)(app_map.keys(), start, stop): # noqa |
| 25 | + metric = item["metric"] |
| 26 | + _, value = item["value"] |
| 27 | + volumes.append({ |
| 28 | + "app_id": str(app_map[metric['namespace']].uuid), |
| 29 | + "owner": app_map[metric['namespace']].owner_id, |
| 30 | + "type": "volume", |
| 31 | + "unit": "bytes", |
| 32 | + "name": metric["storageclass"], |
| 33 | + "usage": value, |
| 34 | + "kwargs": { |
| 35 | + "persistentvolumeclaim": metric['persistentvolumeclaim'], |
| 36 | + }, |
| 37 | + "timestamp": start |
| 38 | + }) |
| 39 | + send_usage.apply_async( |
| 40 | + args=(volumes,), eta=start_time + timedelta(seconds=random.randint(1, 1800)) |
| 41 | + ) |
| 42 | + |
18 | 43 | def handle(self, *args, **options): |
19 | 44 | if settings.WORKFLOW_MANAGER_URL: |
20 | | - start_time, timestamp, task_id = timezone.now(), time.time(), uuid.uuid4().hex |
21 | | - logger.info(f"pushing {task_id} volumes to workflow_manager when {start_time}") |
22 | | - volume_list = [] |
23 | | - for volume in Volume.objects.all(): |
24 | | - volume_list.extend(volume.to_usages(timestamp)) |
25 | | - if len(volume_list) % 1000 == 0: |
26 | | - send_usage.apply_async( |
27 | | - args=(volume_list,), |
28 | | - eta=start_time + timedelta(seconds=random.randint(1, 1800)) |
29 | | - ) |
30 | | - volume_list = [] |
31 | | - if len(volume_list) > 0: |
32 | | - send_usage.apply_async( |
33 | | - args=(volume_list,), |
34 | | - eta=start_time + timedelta(seconds=random.randint(1, 1800)) |
35 | | - ) |
| 45 | + start_time, timestamp, task_id = timezone.now(), int(time.time()), uuid.uuid4().hex |
| 46 | + logger.info(f"pushing {task_id} volumes to workflow_manager when {timezone.now()}") |
| 47 | + app_map = {} |
| 48 | + for app in App.objects.all(): |
| 49 | + app_map[app.id] = app |
| 50 | + if len(app_map) % 1000 == 0: |
| 51 | + self._upload_volume_usage(start_time, app_map, timestamp) |
| 52 | + app_map = {} |
| 53 | + if len(app_map) > 0: |
| 54 | + self._upload_volume_usage(start_time, app_map, timestamp) |
36 | 55 | logger.info(f"pushed {task_id} volumes to workflow_manager when {timezone.now()}") |
37 | 56 | self.stdout.write("done") |
0 commit comments