Skip to content

Commit 2f84679

Browse files
committed
fix(controller): miss err msg
1 parent 1786cda commit 2f84679

8 files changed

Lines changed: 92 additions & 74 deletions

File tree

charts/controller/templates/controller-celery-deloyment.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ spec:
3939
- $(DRYCC_REDIS_ADDRS),$(DRYCC_CONTROLLER_API_SERVICE_HOST):$(DRYCC_CONTROLLER_API_SERVICE_PORT)
4040
{{- include "controller.envs" . | indent 8 }}
4141
containers:
42-
- name: drycc-controller-celery
42+
{{- range $key := (list "low" "middle" "high") }}
43+
- name: drycc-controller-celery-{{$key}}
4344
image: {{$.Values.imageRegistry}}/{{$.Values.imageOrg}}/controller:{{$.Values.imageTag}}
4445
imagePullPolicy: {{$.Values.imagePullPolicy}}
4546
args:
4647
- /bin/bash
4748
- -c
48-
- celery -A api worker --autoscale=32,1 --loglevel=WARNING
49+
- celery -A api worker -Q {{$key}} --autoscale=32,1 --loglevel=WARNING
4950
{{- include "controller.limits" $ | indent 8 }}
5051
{{- include "controller.envs" $ | indent 8 }}
52+
{{- end }}

rootfs/api/consumers.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22
import time
3+
import six
4+
import ssl
35
import aiohttp
46
import asyncio
57
import collections
@@ -10,6 +12,7 @@
1012
from kubernetes.client import Configuration
1113
from kubernetes.client.api import core_v1_api
1214
from kubernetes.stream import stream
15+
from kubernetes.stream.ws_client import STDOUT_CHANNEL, STDERR_CHANNEL, ERROR_CHANNEL
1316

1417
from channels.db import database_sync_to_async
1518
from channels.exceptions import DenyConnection
@@ -85,25 +88,27 @@ async def connect(self):
8588
await super().connect()
8689
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
8790

88-
async def send(self, data):
91+
async def send(self, data, channel=STDOUT_CHANNEL):
92+
channel_prefix = chr(channel)
8993
if data is None:
9094
return
9195
elif isinstance(data, bytes):
92-
await super().send(bytes_data=data)
96+
channel_prefix = six.binary_type(channel_prefix, "ascii")
97+
await super().send(bytes_data=channel_prefix+data)
9398
elif isinstance(data, str):
94-
await super().send(text_data=data)
99+
await super().send(text_data=channel_prefix+data)
95100

96101
async def task(self):
97102
deadline = time.time() + settings.DRYCC_APP_POD_EXEC_TIMEOUT
98103
while self.stream.is_open() and self.conneted and time.time() < deadline:
99-
await sync_to_async(self.stream.update)(0.1)
100-
if self.stream.peek_stdout():
101-
data = self.stream.read_stdout()
102-
elif self.stream.peek_stderr():
103-
data = self.stream.read_stderr()
104-
else:
105-
data = None
106-
await self.send(data)
104+
try:
105+
await sync_to_async(self.stream.update)(0.1)
106+
for channel in (ERROR_CHANNEL, STDOUT_CHANNEL, STDERR_CHANNEL):
107+
if channel in self.stream._channels:
108+
data = self.stream.read_channel(channel)
109+
await self.send(data, channel)
110+
except ssl.SSLEOFError:
111+
break
107112
await self.close(code=1000)
108113

109114
async def disconnect(self, close_code):
@@ -115,14 +120,9 @@ async def receive(self, text_data=None, bytes_data=None):
115120
if self.stream is None and text_data is not None:
116121
args = (self.kubernetes.connect_get_namespaced_pod_exec, self.pod_id, self.id)
117122
kwargs = json.loads(text_data)
118-
kwargs.update({"stderr": True, "stdout": True})
119-
if kwargs["stdin"]:
120-
kwargs.update({"_preload_content": False})
121-
self.stream = stream(*args, **kwargs)
122-
asyncio.create_task(self.task())
123-
else:
124-
await self.send(stream(*args, **kwargs))
125-
await self.close(code=1000)
123+
kwargs.update({"stderr": True, "stdout": True, "_preload_content": False})
124+
self.stream = stream(*args, **kwargs)
125+
asyncio.create_task(self.task())
126126
elif self.stream is not None:
127127
data = text_data if text_data else bytes_data
128128
channel, data = ord(data[0]), data[1:]

rootfs/api/models/app.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def deploy(self, release, force_deploy=False, rollback_on_failure=True): # noqa
381381
self.cleanup_old()
382382
release.cleanup_old()
383383

384-
def mount(self, user, volume):
384+
def mount(self, user, volume, structure=None):
385385
if self.release_set.filter(failed=False).latest().build is None:
386386
raise DryccException('No build associated with this release')
387387
release = self.release_set.filter(failed=False).latest()
@@ -391,7 +391,8 @@ def mount(self, user, volume):
391391
user,
392392
volume,
393393
self.release_set.filter(failed=False, canary=False).latest(),
394-
app_settings
394+
app_settings,
395+
structure=structure,
395396
)
396397
self._mount(user, volume, release, app_settings)
397398

@@ -699,14 +700,14 @@ def _clean_app_logs(self):
699700
err = 'Error deleting existing application logs: {}'.format(e)
700701
self.log(err, logging.WARNING)
701702

702-
def _mount(self, user, volume, release, app_settings):
703-
volumes = list(Volume.objects.filter(app=self).exclude(name=volume.name))
704-
volumes.append(volume)
703+
def _mount(self, user, volume, release, app_settings, structure=None):
704+
volumes = Volume.objects.filter(app=self)
705705
tasks = []
706-
for scale_type, replicas in self.structure.items():
706+
for scale_type, replicas in structure.items() if structure else self.structure.items():
707707
if not release.canary or scale_type in app_settings.canaries:
708708
replicas = self.structure.get(scale_type, 0)
709-
scale_type_volumes = [_ for _ in volumes if scale_type in _.path.keys()]
709+
scale_type_volumes = [
710+
volume for volume in volumes if scale_type in volume.path.keys()]
710711
data = self._gather_app_settings(
711712
release, app_settings, scale_type, replicas, volumes=scale_type_volumes)
712713
deployment = self._scheduler.deployment.get(
@@ -735,9 +736,7 @@ def _mount(self, user, volume, release, app_settings):
735736
err = f'(changed volume mount for {volume}: {e}'
736737
self.log(err, logging.ERROR)
737738
raise ServiceUnavailable(err) from e
738-
739-
msg = f'{user.username} changed volume mount for {volume}'
740-
self.log(msg)
739+
self.log(f'{user.username} changed volume mount for {volume}')
741740

742741
def _scale(self, user, structure, release, app_settings): # noqa
743742
"""Scale containers up or down to match requested structure."""

rootfs/api/models/volume.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ def save(self, *args, **kwargs):
2929
# Attach volume, updates k8s
3030
if self.type == "csi" and self.created == self.updated:
3131
self._create_pvc()
32+
# check path
33+
self._check_path()
3234
# Save to DB
3335
return super(Volume, self).save(*args, **kwargs)
3436

@@ -97,10 +99,24 @@ def _format_size(size):
9799
size = size.upper() + "i"
98100
return size
99101

102+
def _check_path(self):
103+
other_volumes = self.app.volume_set.exclude(name=self.name)
104+
type_paths = {} # {'type1':[path1,path2], tyep2:[path3,path4]}
105+
for _ in other_volumes:
106+
for k, v in _.path.items():
107+
if k not in type_paths:
108+
type_paths[k] = [v]
109+
else:
110+
type_paths[k].append(v)
111+
repeat_path = [v for k, v in self.path.items() if v in type_paths.get(k, [])]
112+
if repeat_path:
113+
raise DryccException("path {} is used by another volume".
114+
format(','.join(repeat_path)))
115+
100116
def _create_pvc(self):
101117
try:
102118
self._scheduler.pvc.get(self.app.id, self.name)
103-
err = "Volume {} already exists in this namespace".format(self.name) # noqa
119+
err = "Volume {} already exists in this namespace".format(self.name)
104120
self.log(err, logging.INFO)
105121
raise AlreadyExists(err)
106122
except KubeException as e:

rootfs/api/settings/celery.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ class Config(object):
3636
'exchange': 'controller.priority',
3737
'routing_key': 'controller.priority.high',
3838
},
39+
'api.tasks.mount_app': {
40+
'queue': 'low',
41+
'exchange': 'controller.priority',
42+
'routing_key': 'controller.priority.high',
43+
},
3944
'api.tasks.restart_app': {
4045
'queue': 'high',
4146
'exchange': 'controller.priority',
@@ -46,6 +51,11 @@ class Config(object):
4651
'exchange': 'controller.priority',
4752
'routing_key': 'controller.priority.high',
4853
},
54+
'api.tasks.downstream_model_owner': {
55+
'queue': 'high',
56+
'exchange': 'controller.priority',
57+
'routing_key': 'controller.priority.high',
58+
},
4959
'api.tasks.send_measurements': {
5060
'queue': 'middle',
5161
'exchange': 'controller.priority',

rootfs/api/tasks.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,27 @@ def restart_app(app, **kwargs):
9494
retry_jitter=True,
9595
retry_kwargs={'max_retries': 3}
9696
)
97-
def mount_app(app, user, volume):
97+
def mount_app(app, user, volume, path):
9898
task_id = uuid.uuid4().hex
9999
signals.request_started.send(sender=task_id)
100100
try:
101-
app.mount(user, volume)
101+
# merge mount volume path and remove keys if a null value is provided
102+
for key, value in path.items():
103+
if value is None:
104+
if key not in volume.path:
105+
continue
106+
volume.path.pop(key)
107+
else:
108+
volume.path[key] = value
102109
volume.save()
110+
structure = {}
111+
for scale_type, replicas in app.structure.items():
112+
if scale_type in path:
113+
structure[scale_type] = replicas
114+
app.mount(user, volume, structure)
103115
except Exception as e:
104-
print(e)
105116
signals.got_request_exception.send(sender=task_id)
106-
raise e
117+
logger.exception(e)
107118
else:
108119
signals.request_finished.send(sender=task_id)
109120

rootfs/api/tests/test_volume.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,11 @@ def test_volume_mount(self, mock_requests):
186186
url = '/v2/apps/{app_id}/volumes/myvolume1/path'.format(app_id=app_id)
187187
mount_path = {"path": {"web": "/data/web1"}}
188188
response = self.client.patch(url, data=mount_path)
189-
expected = response.data['path']
190-
self.assertEqual(mount_path['path'], expected)
189+
expected = response.data['path'] # old data
190+
self.assertEqual({}, expected)
191+
url = '/v2/apps/{app_id}/volumes/myvolume1'.format(app_id=app_id)
192+
response = self.client.get(url)
193+
self.assertEqual(response.data["path"], mount_path["path"])
191194

192195
def test_volume_unmount(self, mock_requests):
193196
# create
@@ -210,7 +213,7 @@ def test_volume_unmount(self, mock_requests):
210213
mount_path = {"path": {"web": "/data/web1"}}
211214
response = self.client.patch(url, data=mount_path)
212215
expected = response.data['path']
213-
self.assertEqual(mount_path['path'], expected)
216+
self.assertEqual({}, expected)
214217
# check mount
215218
url = '/v2/apps/{app_id}/volumes/myvolume1'.format(app_id=app_id)
216219
response = self.client.get(url)
@@ -221,7 +224,11 @@ def test_volume_unmount(self, mock_requests):
221224
mount_path = {"path": {"web": None}}
222225
response = self.client.patch(url, data=mount_path)
223226
expected = response.data['path']
224-
self.assertEqual({}, expected)
227+
self.assertEqual(mount_path["path"], {'web': None})
228+
# check mount
229+
url = '/v2/apps/{app_id}/volumes/myvolume1'.format(app_id=app_id)
230+
response = self.client.get(url)
231+
self.assertEqual(response.data["path"], {})
225232

226233
def build_deploy(self, app_id):
227234
# post a new build with procfile

rootfs/api/views.py

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
from api import monitor, models, permissions, serializers, viewsets, authentication
2222
from api.tasks import scale_app, restart_app, mount_app, downstream_model_owner
23-
from api.exceptions import AlreadyExists, ServiceUnavailable, DryccException, \
24-
UnprocessableEntity
23+
from api.exceptions import AlreadyExists, ServiceUnavailable, DryccException
2524

2625
from django.views.decorators.cache import never_cache
2726
from django.contrib.auth import REDIRECT_FIELD_NAME
@@ -714,48 +713,22 @@ def destroy(self, request, **kwargs):
714713
return Response(status=status.HTTP_204_NO_CONTENT)
715714

716715
def path(self, request, *args, **kwargs):
717-
new_path = request.data.get('path')
718-
if new_path is None:
716+
path = request.data.get('path')
717+
if path is None:
719718
raise DryccException("path is a required field")
720719
else:
721-
new_path = serializers.VolumeSerializer().validate_path(new_path)
720+
path = serializers.VolumeSerializer().validate_path(path)
722721
volume = self.get_object()
723-
container_types = [_ for _ in new_path.keys()
722+
container_types = [_ for _ in path.keys()
724723
if _ not in volume.app.types]
725724
if container_types:
726725
raise DryccException("process type {} is not included in profile".
727726
format(','.join(container_types)))
728-
729-
if set(new_path.items()).issubset(set(volume.path.items())):
727+
if set(path.items()).issubset(set(volume.path.items())):
730728
raise DryccException("mount path not changed")
731729

732-
other_volumes = self.get_app().volume_set.exclude(name=volume.name)
733-
type_paths = {} # {'type1':[path1,path2], tyep2:[path3,path4]}
734-
for _ in other_volumes:
735-
for k, v in _.path.items():
736-
if k not in type_paths:
737-
type_paths[k] = [v]
738-
else:
739-
type_paths[k].append(v)
740-
repeat_path = [v for k, v in new_path.items() if v in type_paths.get(k, [])] # noqa
741-
if repeat_path:
742-
raise DryccException("path {} is used by another volume".
743-
format(','.join(repeat_path)))
744-
path = volume.path
745-
# merge mount path
746-
# remove path keys if a null value is provided
747-
for key, value in new_path.items():
748-
if value is None:
749-
# error if unsetting non-existing key
750-
if key not in path:
751-
raise UnprocessableEntity(
752-
'{} does not exist under {}'.format(key, "volume")) # noqa
753-
path.pop(key)
754-
else:
755-
path[key] = value
756730
app = self.get_app()
757-
volume.path = path # volume save by task success
758-
mount_app.delay(app, self.request.user, volume)
731+
mount_app.delay(app, self.request.user, volume, path)
759732
serializer = self.get_serializer(volume, many=False)
760733
return Response(serializer.data)
761734

0 commit comments

Comments
 (0)