Skip to content

Commit d0f6cc3

Browse files
committed
feat(volumes): add volumes client support
1 parent 2047d41 commit d0f6cc3

3 files changed

Lines changed: 94 additions & 3 deletions

File tree

rootfs/api/urls.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@
111111
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/path/?$".format(settings.APP_URL_REGEX),
112112
views.AppVolumesViewSet.as_view({'patch': 'path'})),
113113
re_path(
114-
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/client/?$".format(settings.APP_URL_REGEX),
114+
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/files(/?(?P<path>[\S]+)/?)?$".format(
115+
settings.APP_URL_REGEX),
115116
views.AppVolumesViewSet.as_view({'get': 'client', 'post': 'client'})),
116117
# application resources
117118
re_path(r"^resources/services/?$", views.AppResourcesViewSet.as_view({'get': 'services'})),

rootfs/api/utils.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
import jsonschema
1717
from copy import deepcopy
1818
from django.db import models
19+
from django.conf import settings
20+
from django.core.cache import cache
21+
from requests.auth import HTTPBasicAuth
1922
from requests_toolbelt import user_agent
2023
from api import __version__ as drycc_version
2124
from rest_framework.exceptions import ValidationError
25+
from scheduler import KubeException
2226

2327
logger = logging.getLogger(__name__)
2428

@@ -196,6 +200,78 @@ def validate_json(value, schema, raise_exception=ValidationError):
196200
return value
197201

198202

203+
class VolumeClient(object):
204+
205+
def __init__(self, app_id, volume, scheduler):
206+
self.bind = ":9000"
207+
self.path = "/data"
208+
self.app_id = app_id
209+
self.volume = volume
210+
self.scheduler = scheduler
211+
212+
@property
213+
def server(self):
214+
_server = cache.get(self.cache_key, None)
215+
if not _server or not self.health(_server):
216+
self.cleanup()
217+
k8s_volume = {"name": self.volume.name}
218+
if self.volume.type == "csi":
219+
k8s_volume.update({"persistentVolumeClaim": {"claimName": self.volume.name}})
220+
else:
221+
k8s_volume.update(self.volume.parameters)
222+
username, password = random_string(32), random_string(32)
223+
self.scheduler.pod.create(self.app_id, self.pod_name, settings.DRYCC_FILER_IMAGE, **{
224+
"args": [
225+
"filer",
226+
"--bind", self.bind, "--path", self.path,
227+
"--duration", f"{settings.DRYCC_FILER_DURATION}",
228+
"--waittime", f"{settings.DRYCC_FILER_WAITTIME}",
229+
"--username", f"{username}", "--password", f"{password}",
230+
],
231+
"app_type": "filer",
232+
"replicas": 1, "deploy_timeout": 120, "volumes": [k8s_volume],
233+
"volume_mounts": [{"mountPath": self.path, "name": self.volume.name}],
234+
"image_pull_policy": "Always",
235+
})
236+
address = self.scheduler.pod.get(self.app_id, self.pod_name).json()["status"]["podIP"]
237+
_server = {"address": address, "username": username, "password": password}
238+
cache.set(self.cache_key, _server, timeout=settings.DRYCC_FILER_DURATION)
239+
return _server
240+
241+
@property
242+
def pod_name(self):
243+
return f"drycc-filer-{self.volume.name}"
244+
245+
@property
246+
def cache_key(self):
247+
return f"filer:{self.pod_name}"
248+
249+
def health(self, server):
250+
try:
251+
return self.request(
252+
"OPTIONS", server, timeout=2).headers.get('server') == 'drycc-filer'
253+
except requests.exceptions.Timeout:
254+
return False
255+
256+
def cleanup(self):
257+
try:
258+
self.scheduler.pod.delete(self.app_id, self.pod_name)
259+
except KubeException:
260+
pass
261+
262+
def request(self, method, server, path="/", **kwargs):
263+
cache.touch(self.cache_key, timeout=settings.DRYCC_FILER_DURATION)
264+
url = f"http://{server["address"]}:{self.bind.split(":")[1]}/{path}"
265+
kwargs["auth"] = HTTPBasicAuth(server["username"], server["password"])
266+
return get_session().request(method, url, **kwargs)
267+
268+
def get(self, path, **kwargs):
269+
return self.request("GET", self.server, path, **kwargs)
270+
271+
def post(self, path, **kwargs):
272+
return self.request("POST", self.server, path, **kwargs)
273+
274+
199275
if __name__ == "__main__":
200276
import doctest
201277
doctest.testmod()

rootfs/api/views.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from rest_framework.permissions import IsAuthenticated, AllowAny
2424
from rest_framework.response import Response
2525
from rest_framework.viewsets import GenericViewSet
26+
from rest_framework.decorators import parser_classes
27+
from rest_framework.parsers import MultiPartParser
2628

2729
from api import monitor, models, permissions, serializers, viewsets, authentication
2830
from api.tasks import scale_app, restart_app, mount_app, downstream_model_owner
@@ -35,7 +37,7 @@
3537
from social_django.utils import psa
3638
from social_django.views import _do_login
3739
from social_core.utils import setting_name
38-
from api import admissions
40+
from api import admissions, utils
3941
from api.backend import OauthCacheManager
4042
from api.apps_extra.social_core.actions import do_auth, do_complete
4143

@@ -824,8 +826,20 @@ def get_object(self):
824826
app__id=self.kwargs['id'],
825827
name=self.kwargs['name'])
826828

829+
@method_decorator(parser_classes([MultiPartParser]))
827830
def client(self, request, **kwargs):
828-
pass
831+
path = self.kwargs.get('path', '')
832+
volume = self.get_object()
833+
client = utils.VolumeClient(volume.app.id, volume, volume.app.scheduler())
834+
if request.method == "GET":
835+
response = client.get(path, stream=True)
836+
return StreamingHttpResponse(
837+
status=response.status_code, streaming_content=response.iter_content(),
838+
content_type=response.headers.get('Content-Type', 'application/octet-stream'))
839+
elif request.method == "POST":
840+
client.post(path, files=request.FILES)
841+
return HttpResponse(status=status.HTTP_204_NO_CONTENT)
842+
return HttpResponse(status=status.HTTP_405_METHOD_NOT_ALLOWED)
829843

830844
def expand(self, request, **kwargs):
831845
volume = self.get_object()

0 commit comments

Comments
 (0)