-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmeasure_app.py
More file actions
113 lines (105 loc) · 4.78 KB
/
measure_app.py
File metadata and controls
113 lines (105 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import uuid
import time
import logging
from contextlib import closing
from django.utils import timezone
from django.core.management.base import BaseCommand
from django.conf import settings
from api.tasks import measure_networks, measure_instances
from api.models import App
from api.utils import get_influxdb_client
logger = logging.getLogger(__name__)
class Command(BaseCommand):
"""Management command for push data to manager"""
def _build_query_networks_flux(self, app_map, timestamp):
timestamp = int(timestamp)
stop = timestamp - (timestamp % 3600)
start = stop - 3600
namespace_range = ' or '.join(
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
return f'''
from(bucket: "kubernetes")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_network"
and ({namespace_range}))
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
|> increase(columns: ["rx_bytes", "tx_bytes", "tx_errors", "rx_errors"])
|> last(column: "_time")
'''
def _build_query_instances_flux(self, app_map, timestamp):
timestamp = int(timestamp)
stop = timestamp - (timestamp % 3600)
start = stop - 3600
namespace_range = ' or '.join(
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
return f'''
from(bucket: "kubernetes")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container"
and r["_field"]=="cpu_usage_core_nanoseconds"
and ({namespace_range}))
|> group(columns: ["_time", "namespace", "container_name"])
|> count(column: "_value")
|> group(columns: ["namespace", "container_name"])
|> top(n: 3)
|> min(column: "_value")
'''
def _measure_networks(self, app_map, timestamp):
networks = []
with closing(get_influxdb_client()) as client:
with closing(client.query_api()) as query_api:
with closing(query_api.query_stream(
self._build_query_networks_flux(app_map, timestamp))
) as records:
for record in records:
app_id = record["namespace"]
owner_id = app_map[app_id].owner_id
networks.append({
"app_id": app_id,
"owner_id": owner_id,
"pod_name": record["pod_name"],
"rx_bytes": record["rx_bytes"],
"tx_bytes": record["tx_bytes"],
"timestamp": timestamp
})
measure_networks.delay(networks)
def _measure_instances(self, app_map, timestamp):
instances = []
with closing(get_influxdb_client()) as client:
with closing(client.query_api()) as query_api:
with closing(query_api.query_stream(
self._build_query_instances_flux(app_map, timestamp))
) as records:
for record in records:
app_id = record["namespace"]
owner_id = app_map[app_id].owner_id
container_type = record["container_name"].replace(f"-{app_id}", "", 1)
instances.append({
"app_id": app_id,
"owner_id": owner_id,
"container_type": container_type,
"container_count": record["_value"],
"timestamp": timestamp
})
measure_instances.delay(instances)
def handle(self, *args, **options):
if settings.WORKFLOW_MANAGER_URL is not None:
timestamp = time.time()
task_id = uuid.uuid4().hex
logger.info(f"pushing {task_id} networks to workflow_manager when {timezone.now()}")
app_map = {}
for app in App.objects.all():
app_map[app.id] = app
if len(app_map) % 1000 == 0:
self._measure_networks(app_map, timestamp)
self._measure_instances(app_map, timestamp)
app_map = {}
if len(app_map) > 0:
self._measure_networks(app_map, timestamp)
self._measure_instances(app_map, timestamp)
logger.info(f"pushed {task_id} networks to workflow_manager when {timezone.now()}")
self.stdout.write("done")