Skip to content

Commit 9b8abd1

Browse files
committed
feat(influxdb): review influxdb code
1 parent 4e1dec1 commit 9b8abd1

4 files changed

Lines changed: 231 additions & 90 deletions

File tree

rootfs/api/influxdb.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import threading
2+
from typing import Iterator
3+
from contextlib import closing
4+
from django.conf import settings
5+
from influxdb_client import InfluxDBClient
6+
from influxdb_client.client.flux_table import FluxRecord
7+
8+
local = threading.local()
9+
10+
11+
def _get_influxdb_client() -> InfluxDBClient:
12+
if not hasattr(local, "influxdb_client"):
13+
local.influxdb_client = InfluxDBClient(
14+
url=settings.INFLUXDB_URL,
15+
token=settings.INFLUXDB_TOKEN,
16+
org=settings.INFLUXDB_ORG
17+
)
18+
return local.influxdb_client
19+
20+
21+
def _query_stream(flux_script: str) -> Iterator[FluxRecord]:
22+
with closing(_get_influxdb_client()) as client:
23+
with closing(client.query_api()) as query_api:
24+
with closing(query_api.query_stream(flux_script)) as records:
25+
yield from records
26+
27+
28+
def query_container_count(
29+
namespaces: Iterator[str], start: int, stop: int) -> Iterator[FluxRecord]:
30+
namespace_range = ' or '.join(
31+
['r["namespace"] == \"{namespace}\"' for namespace in namespaces])
32+
flux_script = f'''
33+
from(bucket: "kubernetes")
34+
|> range(start: {start}, stop: {stop})
35+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
36+
|> filter(fn: (r) => r["_field"]=="cpu_usage_core_nanoseconds")
37+
|> filter(fn: (r) => {namespace_range})
38+
|> group(columns: ["_time", "namespace", "container_name"])
39+
|> count()
40+
|> group(columns: ["namespace", "container_name"])
41+
|> top(n: 3)
42+
|> min()
43+
'''
44+
yield from _query_stream(flux_script)
45+
46+
47+
def query_network_flow(
48+
namespaces: Iterator[str], start: int, stop: int) -> Iterator[FluxRecord]:
49+
namespace_range = ' or '.join(
50+
['r["namespace"] == \"{namespace}\"' for namespace in namespaces])
51+
flux_script = f'''
52+
from(bucket: "kubernetes")
53+
|> range(start: {start}, stop: {stop})
54+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_network")
55+
|> filter(fn: (r) => r["_field"] == "rx_bytes" or r["_field"] == "tx_bytes")
56+
|> filter(fn: (r) => {namespace_range})
57+
|> increase()
58+
|> last()
59+
|> pivot(
60+
rowKey:["_time"],
61+
columnKey: ["_field"],
62+
valueColumn: "_value"
63+
)
64+
'''
65+
yield from _query_stream(flux_script)
66+
67+
68+
def query_cpu_usage(
69+
namespace, container_type, start, stop, every) -> Iterator[FluxRecord]:
70+
flux_script = f"""
71+
from(bucket: "kubernetes")
72+
|> range(start: {start}, stop: {stop})
73+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
74+
|> filter(fn: (r) => r["_field"] == "cpu_usage_nanocores")
75+
|> filter(fn: (r) => r["namespace"] == "{namespace}")
76+
|> filter(fn: (r) => r["pod_name"] =~ { "/%s-%s/" % (namespace, container_type) })
77+
|> group(columns: ["container_name"])
78+
|> aggregateWindow(every: {every}, fn: max, createEmpty: false)
79+
|> yield(name: "max")
80+
|> aggregateWindow(every: {every}, fn: mean, createEmpty: false)
81+
|> yield(name: "mean")
82+
"""
83+
yield from _query_stream(flux_script)
84+
85+
86+
def query_memory_usage(
87+
namespace, container_type, start, stop, every) -> Iterator[FluxRecord]:
88+
flux_script = f"""
89+
from(bucket: "kubernetes")
90+
|> range(start: {start}, stop: {stop})
91+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container")
92+
|> filter(fn: (r) => r["_field"] == "memory_usage_bytes")
93+
|> filter(fn: (r) => r["namespace"] == "{namespace}")
94+
|> filter(fn: (r) => r["pod_name"] =~ { "/%s-%s/" % (namespace, container_type) })
95+
|> group(columns: ["container_name"])
96+
|> aggregateWindow(every: {every}, fn: max, createEmpty: false)
97+
|> yield(name: "max")
98+
|> aggregateWindow(every: {every}, fn: mean, createEmpty: false)
99+
|> yield(name: "mean")
100+
"""
101+
yield from _query_stream(flux_script)
102+
103+
104+
def query_network_usage(
105+
namespace, container_type, start, stop, every) -> Iterator[FluxRecord]:
106+
flux_script = f"""
107+
from(bucket: "kubernetes")
108+
|> range(start: {start}, stop: {stop})
109+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_network")
110+
|> filter(fn: (r) => r["_field"] == "rx_bytes" or r["_field"] == "tx_bytes")
111+
|> filter(fn: (r) => r["namespace"] == "{namespace}")
112+
|> filter(fn: (r) => r["pod_name"] =~ { "/%s-%s/" % (namespace, container_type) })
113+
|> group(columns: ["container_name", "_field"])
114+
|> aggregateWindow(every: {every}, fn: max, createEmpty: false)
115+
|> difference(nonNegative: true)
116+
|> pivot(
117+
rowKey:["_time"],
118+
columnKey: ["_field"],
119+
valueColumn: "_value"
120+
)
121+
|> limit(n:3000)
122+
"""
123+
yield from _query_stream(flux_script)

rootfs/api/management/commands/measure_app.py

Lines changed: 28 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,62 @@
11
import uuid
22
import time
33
import logging
4-
from contextlib import closing
54
from django.utils import timezone
65
from django.core.management.base import BaseCommand
76
from django.conf import settings
7+
from api import influxdb
88
from api.tasks import measure_networks, measure_instances
99
from api.models import App
10-
from api.utils import get_influxdb_client
1110

1211
logger = logging.getLogger(__name__)
1312

1413

1514
class Command(BaseCommand):
1615
"""Management command for push data to manager"""
1716

18-
def _build_query_networks_flux(self, app_map, timestamp):
19-
timestamp = int(timestamp)
20-
stop = timestamp - (timestamp % 3600)
21-
start = stop - 3600
22-
namespace_range = ' or '.join(
23-
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
24-
return f'''
25-
from(bucket: "kubernetes")
26-
|> range(start: {start}, stop: {stop})
27-
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_network"
28-
and ({namespace_range}))
29-
|> pivot(
30-
rowKey:["_time"],
31-
columnKey: ["_field"],
32-
valueColumn: "_value"
33-
)
34-
|> increase(columns: ["rx_bytes", "tx_bytes", "tx_errors", "rx_errors"])
35-
|> last(column: "_time")
36-
'''
37-
38-
def _build_query_instances_flux(self, app_map, timestamp):
39-
timestamp = int(timestamp)
17+
def _measure_networks(self, app_map, timestamp):
4018
stop = timestamp - (timestamp % 3600)
4119
start = stop - 3600
42-
namespace_range = ' or '.join(
43-
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
44-
return f'''
45-
from(bucket: "kubernetes")
46-
|> range(start: {start}, stop: {stop})
47-
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container"
48-
and r["_field"]=="cpu_usage_core_nanoseconds"
49-
and ({namespace_range}))
50-
|> group(columns: ["_time", "namespace", "container_name"])
51-
|> count(column: "_value")
52-
|> group(columns: ["namespace", "container_name"])
53-
|> top(n: 3)
54-
|> min(column: "_value")
55-
'''
56-
57-
def _measure_networks(self, app_map, timestamp):
5820
networks = []
59-
with closing(get_influxdb_client()) as client:
60-
with closing(client.query_api()) as query_api:
61-
with closing(query_api.query_stream(
62-
self._build_query_networks_flux(app_map, timestamp))
63-
) as records:
64-
for record in records:
65-
app_id = record["namespace"]
66-
owner_id = app_map[app_id].owner_id
67-
networks.append({
68-
"app_id": app_id,
69-
"owner_id": owner_id,
70-
"pod_name": record["pod_name"],
71-
"rx_bytes": record["rx_bytes"],
72-
"tx_bytes": record["tx_bytes"],
73-
"timestamp": timestamp
74-
})
21+
for record in influxdb.query_network_flow(app_map.keys(), start, stop):
22+
app_id = record["namespace"]
23+
owner_id = app_map[app_id].owner_id
24+
networks.append({
25+
"app_id": app_id,
26+
"user_id": owner_id,
27+
"pod_name": record["pod_name"],
28+
"rx_bytes": record["rx_bytes"],
29+
"tx_bytes": record["tx_bytes"],
30+
"timestamp": timestamp
31+
})
7532
measure_networks.delay(networks)
7633

7734
def _measure_instances(self, app_map, timestamp):
35+
stop = timestamp - (timestamp % 3600)
36+
start = stop - 3600
7837
instances = []
79-
with closing(get_influxdb_client()) as client:
80-
with closing(client.query_api()) as query_api:
81-
with closing(query_api.query_stream(
82-
self._build_query_instances_flux(app_map, timestamp))
83-
) as records:
84-
for record in records:
85-
app_id = record["namespace"]
86-
owner_id = app_map[app_id].owner_id
87-
container_type = record["container_name"].replace(f"-{app_id}", "", 1)
88-
instances.append({
89-
"app_id": app_id,
90-
"owner_id": owner_id,
91-
"container_type": container_type,
92-
"container_count": record["_value"],
93-
"timestamp": timestamp
94-
})
38+
for record in influxdb.query_container_count(app_map.keys(), start, stop):
39+
app_id = record["namespace"]
40+
owner_id = app_map[app_id].owner_id
41+
container_type = record["container_name"].replace(f"{app_id}-", "", 1)
42+
instances.append({
43+
"app_id": app_id,
44+
"user_id": owner_id,
45+
"container_type": container_type,
46+
"container_count": record["_value"],
47+
"timestamp": timestamp
48+
})
9549
measure_instances.delay(instances)
9650

9751
def handle(self, *args, **options):
9852
if settings.WORKFLOW_MANAGER_URL is not None:
99-
timestamp = time.time()
53+
timestamp = int(time.time())
10054
task_id = uuid.uuid4().hex
10155
logger.info(f"pushing {task_id} networks to workflow_manager when {timezone.now()}")
10256
app_map = {}
10357
for app in App.objects.all():
10458
app_map[app.id] = app
105-
if len(app_map) % 1000 == 0:
59+
if len(app_map) % 300 == 0:
10660
self._measure_networks(app_map, timestamp)
10761
self._measure_instances(app_map, timestamp)
10862
app_map = {}

rootfs/api/utils.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,9 @@
66
import hashlib
77
import logging
88
import random
9-
import threading
109
import math
1110
from copy import deepcopy
12-
from django.conf import settings
13-
from influxdb_client import InfluxDBClient
1411

15-
local = threading.local()
1612
logger = logging.getLogger(__name__)
1713

1814

@@ -165,16 +161,6 @@ def apply_tasks(tasks):
165161
executor.shutdown(wait=True)
166162

167163

168-
def get_influxdb_client():
169-
if not hasattr(local, "influxdb_client"):
170-
local.influxdb_client = InfluxDBClient(
171-
url=settings.DRYCC_INFLUXDB_URL,
172-
token=settings.DRYCC_INFLUXDB_TOKEN,
173-
org=settings.DRYCC_INFLUXDB_ORG
174-
)
175-
return local.influxdb_client
176-
177-
178164
def unit_to_bytes(size):
179165
"""
180166
size: str

rootfs/api/views.py

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
"""
44
import logging
55
from copy import deepcopy
6-
76
from django.http import Http404, HttpResponse
87
from django.conf import settings
98
from django.contrib.auth.models import User
@@ -19,10 +18,11 @@
1918
from rest_framework.viewsets import GenericViewSet
2019
from rest_framework.authtoken.models import Token
2120

22-
from api import authentication, models, permissions, serializers, viewsets
21+
from api import influxdb, authentication, models, permissions, serializers, viewsets
2322
from api.models import AlreadyExists, ServiceUnavailable, DryccException, \
2423
UnprocessableEntity
2524

25+
2626
logger = logging.getLogger(__name__)
2727

2828

@@ -908,3 +908,81 @@ def disable(self, request, **kwargs):
908908
user.is_active = False
909909
user.save(update_fields=['is_active', ])
910910
return Response(status=status.HTTP_204_NO_CONTENT)
911+
912+
913+
class MetricView(BaseDryccViewSet):
914+
"""Getting monitoring indicators from influxdb"""
915+
916+
def _get_app(self):
917+
app = get_object_or_404(models.App, id=self.kwargs['app_id'])
918+
self.check_object_permissions(self.request, app)
919+
return app
920+
921+
def _get_cpus(self, app_id, container_type, start, stop, every):
922+
avg_total, max_total = [], []
923+
for record in influxdb.query_memory_usage(app_id, container_type, start, stop, every):
924+
if record["result"] == "mean":
925+
avg_total.append((record["_value"], record["timestamp"]))
926+
else:
927+
max_total.append((record["_value"], record["timestamp"]))
928+
return {
929+
"max_total": max_total,
930+
"avg_total": avg_total
931+
}
932+
933+
def _get_memory(self, app_id, container_type, start, stop, every):
934+
max_total, avg_total = [], []
935+
for record in influxdb.query_memory_usage(app_id, container_type, start, stop, every):
936+
if record["result"] == "mean":
937+
avg_total.append((record["_value"], record["timestamp"]))
938+
else:
939+
max_total.append((record["_value"], record["timestamp"]))
940+
return {
941+
"max_total": max_total,
942+
"avg_total": avg_total
943+
}
944+
945+
def _get_networks(self, app_id, container_type, start, stop, every):
946+
networks = []
947+
for record in influxdb.query_network_usage(app_id, container_type, start, stop, every):
948+
networks.append((record["rx_bytes"], record["tx_bytes"], record["timestamp"]))
949+
return networks
950+
951+
def _get_container_count(self, app_id, container_type, start, stop):
952+
for record in influxdb.query_container_count([app_id, ], start, stop):
953+
if record["container_name"] == "%s-%s" % (app_id, container_type):
954+
return record["_value"]
955+
return 0
956+
957+
def status(self, request, **kwargs):
958+
"""
959+
{
960+
961+
app_id: "django_t1",
962+
container_type: "web",
963+
container_count: 1
964+
cpus: {
965+
max_total: [(50000, 1611023853)],
966+
avg_total: [(50000, 1611023853)],
967+
timestamp: 1611023853
968+
},
969+
memory: {
970+
max_total: [(50000, 1611023853)],
971+
avg_total: [(50000, 1611023853)],
972+
timestamp: 1611023853
973+
},
974+
networks: [
975+
(10000, 50000, 1611023853)
976+
],
977+
}
978+
"""
979+
app_id, container_type = self._get_app().pk, kwargs['container_type']
980+
start, stop, every = kwargs['start'], kwargs['stop'], kwargs["every"]
981+
return {
982+
"app_id": app_id,
983+
"container_type": container_type,
984+
"container_count": self._get_container_count(app_id, container_type, start, stop),
985+
"cpu_usage_list": self._get_cpus(app_id, container_type, start, stop, every),
986+
"memory": self._get_memory(app_id, container_type, start, stop, every),
987+
"networks": self._get_networks(app_id, container_type, start, stop, every)
988+
}

0 commit comments

Comments
 (0)