Skip to content

Commit 83a9bd7

Browse files
committed
feat(workflow-manager): add workflow-manager support
1 parent df92e6f commit 83a9bd7

17 files changed

Lines changed: 539 additions & 208 deletions

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ PLATFORM ?= linux/amd64,linux/arm64
77

88
include versioning.mk
99

10-
SHELLCHECK_PREFIX := docker run -v ${CURDIR}:/workdir -w /workdir drycc/go-dev shellcheck
10+
SHELLCHECK_PREFIX := docker run -v ${CURDIR}:/workdir -w /workdir ${DRYCC_REGISTRY}drycc/go-dev shellcheck
1111
SHELL_SCRIPTS = $(wildcard rootfs/bin/*) $(shell find "rootfs" -name '*.sh') $(wildcard _scripts/*.sh)
1212

1313
# Test processes used in quick unit testing

charts/controller/templates/controller-cronjob-daily.yaml

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,51 @@ spec:
3131
readOnly: true
3232
- image: {{.Values.image_registry}}{{.Values.org}}/controller:{{.Values.image_tag}}
3333
imagePullPolicy: {{.Values.pull_policy}}
34-
name: drycc-controller-load-db-state-to-k8s
34+
name: drycc-controller-measure-app
3535
command:
3636
- /bin/bash
3737
- -c
3838
args:
39-
- python -u /app/manage.py load_db_state_to_k8s
39+
- python -u /app/manage.py measure_app
40+
{{- include "controller.envs" . | indent 12 }}
41+
volumeMounts:
42+
- mountPath: /etc/slugrunner
43+
name: slugrunner-config
44+
readOnly: true
45+
- image: {{.Values.image_registry}}{{.Values.org}}/controller:{{.Values.image_tag}}
46+
imagePullPolicy: {{.Values.pull_policy}}
47+
name: drycc-controller-measure-config
48+
command:
49+
- /bin/bash
50+
- -c
51+
args:
52+
- python -u /app/manage.py measure_config
53+
{{- include "controller.envs" . | indent 12 }}
54+
volumeMounts:
55+
- mountPath: /etc/slugrunner
56+
name: slugrunner-config
57+
readOnly: true
58+
- image: {{.Values.image_registry}}{{.Values.org}}/controller:{{.Values.image_tag}}
59+
imagePullPolicy: {{.Values.pull_policy}}
60+
name: drycc-controller-measure-resources
61+
command:
62+
- /bin/bash
63+
- -c
64+
args:
65+
- python -u /app/manage.py measure_resources
66+
{{- include "controller.envs" . | indent 12 }}
67+
volumeMounts:
68+
- mountPath: /etc/slugrunner
69+
name: slugrunner-config
70+
readOnly: true
71+
- image: {{.Values.image_registry}}{{.Values.org}}/controller:{{.Values.image_tag}}
72+
imagePullPolicy: {{.Values.pull_policy}}
73+
name: drycc-controller-measure-volumes
74+
command:
75+
- /bin/bash
76+
- -c
77+
args:
78+
- python -u /app/manage.py measure_volumes
4079
{{- include "controller.envs" . | indent 12 }}
4180
volumeMounts:
4281
- mountPath: /etc/slugrunner
@@ -46,3 +85,4 @@ spec:
4685
- name: slugrunner-config
4786
configMap:
4887
name: slugrunner-config
88+
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import uuid
2+
import time
3+
from contextlib import closing
4+
from django.utils import timezone
5+
from django.core.management.base import BaseCommand
6+
from django.conf import settings
7+
from api.tasks import measure_networks, measure_instances
8+
from api.models import App
9+
from api.utils import get_influxdb_client
10+
11+
12+
class Command(BaseCommand):
13+
"""Management command for push data to influxdb"""
14+
15+
def _build_query_networks_flux(self, app_map, timestamp):
16+
timestamp = int(timestamp)
17+
stop = timestamp - (timestamp % 3600)
18+
start = stop - 3600
19+
namespace_range = ' or '.join(
20+
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
21+
return f'''
22+
from(bucket: "kubernetes")
23+
|> range(start: {start}, stop: {stop})
24+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_network"
25+
and ({namespace_range}))
26+
|> pivot(
27+
rowKey:["_time"],
28+
columnKey: ["_field"],
29+
valueColumn: "_value"
30+
)
31+
|> increase(columns: ["rx_bytes", "tx_bytes", "tx_errors", "rx_errors"])
32+
|> last(column: "_time")
33+
'''
34+
35+
def _build_query_instances_flux(self, app_map, timestamp):
36+
timestamp = int(timestamp)
37+
stop = timestamp - (timestamp % 3600)
38+
start = stop - 3600
39+
namespace_range = ' or '.join(
40+
['r["namespace"] == \"{app_id}\"' for app_id in app_map.keys()])
41+
return f'''
42+
from(bucket: "kubernetes")
43+
|> range(start: {start}, stop: {stop})
44+
|> filter(fn: (r) => r["_measurement"] == "kubernetes_pod_container"
45+
and r["_field"]=="cpu_usage_core_nanoseconds"
46+
and ({namespace_range}))
47+
|> group(columns: ["_time", "namespace", "container_name"])
48+
|> count(column: "_value")
49+
|> group(columns: ["namespace", "container_name"])
50+
|> top(n: 3)
51+
|> min(column: "_value")
52+
'''
53+
54+
def _measure_networks(self, app_map, timestamp):
55+
networks = []
56+
with closing(get_influxdb_client()) as client:
57+
with closing(client.query_api()) as query_api:
58+
with closing(query_api.query_stream(
59+
self._build_query_networks_flux(app_map, timestamp))
60+
) as records:
61+
for record in records:
62+
app_id = record["namespace"]
63+
user_id = app_map[app_id].user_id
64+
networks.append({
65+
"app_id": app_id,
66+
"user_id": user_id,
67+
"pod_name": record["pod_name"],
68+
"rx_bytes": record["rx_bytes"],
69+
"tx_bytes": record["tx_bytes"],
70+
"timestamp": timestamp
71+
})
72+
measure_networks.delay(networks)
73+
74+
def _measure_instances(self, app_map, timestamp):
75+
instances = []
76+
with closing(get_influxdb_client()) as client:
77+
with closing(client.query_api()) as query_api:
78+
with closing(query_api.query_stream(
79+
self._build_query_instances_flux(app_map, timestamp))
80+
) as records:
81+
for record in records:
82+
app_id = record["namespace"]
83+
user_id = app_map[app_id].user_id
84+
container_type = record["container_name"].replace(f"-{app_id}", "", 1)
85+
instances.append({
86+
"app_id": app_id,
87+
"user_id": user_id,
88+
"container_type": container_type,
89+
"container_count": record["_value"],
90+
"timestamp": timestamp
91+
})
92+
measure_instances.delay(instances)
93+
94+
def handle(self, *args, **options):
95+
if settings.WORKFLOW_MANAGER_URL is not None:
96+
timestamp = time.time()
97+
task_id = uuid.uuid4().hex
98+
print(f"pushing {task_id} networks to workflow_manager when {timezone.now()}")
99+
app_map = {}
100+
for app in App.objects.all():
101+
app_map[app.pk] = app
102+
if len(app_map) % 1000 == 0:
103+
self._measure_networks(app_map, timestamp)
104+
self._measure_instances(app_map, timestamp)
105+
app_map = {}
106+
if len(app_map) > 0:
107+
self._measure_networks(app_map, timestamp)
108+
self._measure_instances(app_map, timestamp)
109+
print(f"pushed {task_id} networks to workflow_manager when {timezone.now()}")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import uuid
2+
import time
3+
from django.utils import timezone
4+
from django.core.management.base import BaseCommand
5+
from django.conf import settings
6+
from api.models import Config
7+
from api.tasks import measure_config
8+
9+
10+
class Command(BaseCommand):
11+
"""Management command for push data to influxdb"""
12+
13+
def handle(self, *args, **options):
14+
if settings.WORKFLOW_MANAGER_URL is not None:
15+
timestamp = time.time()
16+
task_id = uuid.uuid4().hex
17+
print(f"pushing {task_id} limits to workflow_manager when {timezone.now()}")
18+
config_list = []
19+
for config in Config.objects.all():
20+
config_list.extend(config.to_measurements(timestamp))
21+
if len(config_list) % 1000 == 0:
22+
measure_config.delay(config_list)
23+
config_list = []
24+
if len(config_list) > 0:
25+
measure_config.delay(*config_list)
26+
print(f"pushed {task_id} limits to workflow_manager when {timezone.now()}")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import uuid
2+
import time
3+
from django.utils import timezone
4+
from django.core.management.base import BaseCommand
5+
from django.conf import settings
6+
from api.models import Resource
7+
from api.tasks import measure_resources
8+
9+
10+
class Command(BaseCommand):
11+
"""Management command for push data to influxdb"""
12+
13+
def handle(self, *args, **options):
14+
if settings.WORKFLOW_MANAGER_URL is not None:
15+
timestamp = time.time()
16+
task_id = uuid.uuid4().hex
17+
print(f"pushing {task_id} resources to workflow_manager when {timezone.now()}")
18+
resource_list = []
19+
for resource in Resource.objects.all():
20+
resource_list.extend(resource.to_to_measurements(timestamp))
21+
if len(resource_list) % 1000 == 0:
22+
measure_resources.delay(resource_list)
23+
resource_list = []
24+
if len(resource_list) > 0:
25+
measure_resources.delay(*resource_list)
26+
print(f"pushed {task_id} resources to workflow_manager when {timezone.now()}")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import uuid
2+
import time
3+
from django.utils import timezone
4+
from django.core.management.base import BaseCommand
5+
from django.conf import settings
6+
from api.models import Volume
7+
from api.tasks import measure_volumes
8+
9+
10+
class Command(BaseCommand):
11+
"""Management command for push data to influxdb"""
12+
13+
def handle(self, *args, **options):
14+
if settings.WORKFLOW_MANAGER_URL is not None:
15+
timestamp = time.time()
16+
task_id = uuid.uuid4().hex
17+
print(f"pushing {task_id} volumes to workflow_manager when {timezone.now()}")
18+
volume_list = []
19+
for volume in Volume.objects.all():
20+
volume_list.extend(volume.to_measurements(timestamp))
21+
if len(volume_list) % 1000 == 0:
22+
measure_volumes.delay(volume_list)
23+
volume_list = []
24+
if len(volume_list) > 0:
25+
measure_volumes.delay(*volume_list)
26+
print(f"pushed {task_id} volumes to workflow_manager when {timezone.now()}")

rootfs/api/management/commands/push_data_to_influxdb.py

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)