Skip to content

Commit 6ad0ca9

Browse files
committed
chore(volumes): add add filer delete
1 parent cd9b997 commit 6ad0ca9

6 files changed

Lines changed: 192 additions & 116 deletions

File tree

rootfs/api/filer.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import uuid
2+
import requests
3+
from django.conf import settings
4+
from django.core.cache import cache
5+
from requests.auth import HTTPBasicAuth
6+
7+
from .utils import random_string, get_session, CacheLock
8+
9+
10+
class FilerClient(object):
11+
12+
def __init__(self, app_id, volume, scheduler):
13+
self.bind = ":9000"
14+
self.path = "/data"
15+
self.app_id = app_id
16+
self.volume = volume
17+
self.scheduler = scheduler
18+
19+
@property
20+
def server(self):
21+
lock_key = f"filer:lock:{self.app_id}:{self.volume.name}"
22+
lock = CacheLock(lock_key)
23+
try:
24+
lock.acquire()
25+
_server = cache.get(self.cache_key, None)
26+
if not _server or not self.health(_server):
27+
_server = self.get_server()
28+
cache.set(self.cache_key, _server, timeout=settings.DRYCC_FILER_DURATION)
29+
finally:
30+
lock.release()
31+
return _server
32+
33+
@property
34+
def cache_key(self):
35+
return f"filer:{self.app_id}:{self.volume.name}"
36+
37+
def get_server(self):
38+
pod_name = f"drycc-filer-{uuid.uuid4().hex}"
39+
k8s_volume = {"name": self.volume.name}
40+
if self.volume.type == "csi":
41+
k8s_volume.update({"persistentVolumeClaim": {"claimName": self.volume.name}})
42+
else:
43+
k8s_volume.update(self.volume.parameters)
44+
username, password = random_string(32), random_string(32)
45+
self.scheduler.pod.create(self.app_id, pod_name, settings.DRYCC_FILER_IMAGE, **{
46+
"args": [
47+
"filer",
48+
"--bind", self.bind, "--path", self.path,
49+
"--duration", f"{settings.DRYCC_FILER_DURATION}",
50+
"--waittime", f"{settings.DRYCC_FILER_WAITTIME}",
51+
"--username", f"{username}", "--password", f"{password}",
52+
],
53+
"labels": {"app": self.app_id, "pod": pod_name, "volume": self.volume.name},
54+
"app_type": "filer", "replicas": 1, "deploy_timeout": 120, "volumes": [k8s_volume],
55+
"volume_mounts": [{"mountPath": self.path, "name": self.volume.name}],
56+
"restart_policy": "Never", "image_pull_policy": "Always",
57+
})
58+
address = self.scheduler.pod.get(self.app_id, pod_name).json()["status"]["podIP"]
59+
return {"address": address, "username": username, "password": password}
60+
61+
def health(self, server):
62+
try:
63+
return self.request(
64+
"OPTIONS", server, timeout=2).headers.get('server') == 'drycc-filer'
65+
except requests.exceptions.Timeout:
66+
return False
67+
68+
def request(self, method, server, path="/", **kwargs):
69+
cache.touch(self.cache_key, timeout=settings.DRYCC_FILER_DURATION)
70+
url = f"http://{server["address"]}:{self.bind.split(":")[1]}/{path}"
71+
kwargs["auth"] = HTTPBasicAuth(server["username"], server["password"])
72+
return get_session().request(method, url, **kwargs)
73+
74+
def get(self, path, **kwargs):
75+
return self.request("GET", self.server, path, **kwargs)
76+
77+
def post(self, path, **kwargs):
78+
return self.request("POST", self.server, path, **kwargs)
79+
80+
def delete(self, path, **kwargs):
81+
return self.request("DELETE", self.server, path, **kwargs)

rootfs/api/monitor.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import time
2+
import aiohttp
23
import requests
34
from typing import Iterator, Dict
45
from contextlib import closing
@@ -114,29 +115,27 @@
114115
"""
115116

116117

117-
def last_metrics(namespace):
118+
async def last_metrics(namespace):
118119
if not settings.DRYCC_METRICS_CONFIG:
119-
return ()
120+
return
120121
promql = query_last_metrics_promql_tpl % (
121-
'|'.join(settings.DRYCC_METRICS_CONFIG.keys()),
122-
namespace,
123-
'1m'
124-
)
122+
'|'.join(settings.DRYCC_METRICS_CONFIG.keys()), namespace, '1m')
123+
url = urljoin(settings.DRYCC_PROMETHEUS_URL, "/api/v1/query")
125124
params = {"query": promql, "start": int(time.time() - 60)}
126-
response = requests.get(
127-
urljoin(settings.DRYCC_PROMETHEUS_URL, "/api/v1/query"),
128-
params=params
129-
)
130-
if response.status_code != 200:
131-
return ()
132-
return ('%s{%s} %s\n' % (
133-
item['metric']['__name__'],
134-
','.join([
135-
f'{key}="{value}"' for key, value in item['metric'].items()
136-
if key in settings.DRYCC_METRICS_CONFIG[item['metric']['__name__']]
137-
]),
138-
item['value'][1],
139-
) for item in response.json()['data']['result'])
125+
async with aiohttp.ClientSession() as session:
126+
async with session.get(url, params=params) as response:
127+
if response.status != 200:
128+
return
129+
items = await response.json()
130+
for item in items['data']['result']:
131+
yield '%s{%s} %s\n' % (
132+
item['metric']['__name__'],
133+
','.join([
134+
f'{key}="{value}"' for key, value in item['metric'].items()
135+
if key in settings.DRYCC_METRICS_CONFIG[item['metric']['__name__']]
136+
]),
137+
item['value'][1]
138+
)
140139

141140

142141
def query_loadbalancer(namespaces: Iterator[str],

rootfs/api/urls.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,15 @@
110110
re_path(
111111
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/path/?$".format(settings.APP_URL_REGEX),
112112
views.AppVolumesViewSet.as_view({'patch': 'path'})),
113+
# application filer
113114
re_path(
114-
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/files(/?(?P<path>[\S]+)/?)?$".format(
115+
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/client/?$".format(
115116
settings.APP_URL_REGEX),
116-
views.AppVolumesViewSet.as_view({'get': 'client', 'post': 'client'})),
117+
views.AppFilerClientViewSet.as_view({'get': 'list', 'post': 'create'})),
118+
re_path(
119+
r"^apps/(?P<id>{})/volumes/(?P<name>[-_\w]+)/client/(?P<path>[\S]+)$".format(
120+
settings.APP_URL_REGEX),
121+
views.AppFilerClientViewSet.as_view({'get': 'retrieve', 'delete': 'destroy'})),
117122
# application resources
118123
re_path(r"^resources/services/?$", views.AppResourcesViewSet.as_view({'get': 'services'})),
119124
re_path(

rootfs/api/utils.py

Lines changed: 39 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
"""
44
import os
55
import re
6+
import uuid
7+
import time
68
import base64
79
import string
810
import concurrent
@@ -16,13 +18,12 @@
1618
import jsonschema
1719
from copy import deepcopy
1820
from django.db import models
19-
from django.conf import settings
2021
from django.core.cache import cache
21-
from requests.auth import HTTPBasicAuth
22+
from asgiref.sync import sync_to_async
2223
from requests_toolbelt import user_agent
2324
from api import __version__ as drycc_version
2425
from rest_framework.exceptions import ValidationError
25-
from scheduler import KubeException
26+
2627

2728
logger = logging.getLogger(__name__)
2829

@@ -200,76 +201,47 @@ def validate_json(value, schema, raise_exception=ValidationError):
200201
return value
201202

202203

203-
class VolumeClient(object):
204+
class CacheLock(object):
204205

205-
def __init__(self, app_id, volume, scheduler):
206-
self.bind = ":9000"
207-
self.path = "/data"
208-
self.app_id = app_id
209-
self.volume = volume
210-
self.scheduler = scheduler
206+
def __init__(self, key):
207+
self.key = key
208+
self.value = uuid.uuid4().hex
211209

212-
@property
213-
def server(self):
214-
_server = cache.get(self.cache_key, None)
215-
if not _server or not self.health(_server):
216-
self.cleanup()
217-
k8s_volume = {"name": self.volume.name}
218-
if self.volume.type == "csi":
219-
k8s_volume.update({"persistentVolumeClaim": {"claimName": self.volume.name}})
220-
else:
221-
k8s_volume.update(self.volume.parameters)
222-
username, password = random_string(32), random_string(32)
223-
self.scheduler.pod.create(self.app_id, self.pod_name, settings.DRYCC_FILER_IMAGE, **{
224-
"args": [
225-
"filer",
226-
"--bind", self.bind, "--path", self.path,
227-
"--duration", f"{settings.DRYCC_FILER_DURATION}",
228-
"--waittime", f"{settings.DRYCC_FILER_WAITTIME}",
229-
"--username", f"{username}", "--password", f"{password}",
230-
],
231-
"app_type": "filer",
232-
"replicas": 1, "deploy_timeout": 120, "volumes": [k8s_volume],
233-
"volume_mounts": [{"mountPath": self.path, "name": self.volume.name}],
234-
"image_pull_policy": "Always",
235-
})
236-
address = self.scheduler.pod.get(self.app_id, self.pod_name).json()["status"]["podIP"]
237-
_server = {"address": address, "username": username, "password": password}
238-
cache.set(self.cache_key, _server, timeout=settings.DRYCC_FILER_DURATION)
239-
return _server
240-
241-
@property
242-
def pod_name(self):
243-
return f"drycc-filer-{self.volume.name}"
244-
245-
@property
246-
def cache_key(self):
247-
return f"filer:{self.pod_name}"
248-
249-
def health(self, server):
250-
try:
251-
return self.request(
252-
"OPTIONS", server, timeout=2).headers.get('server') == 'drycc-filer'
253-
except requests.exceptions.Timeout:
254-
return False
210+
def acquire(self, blocking=True, timeout=120):
211+
value = None
212+
for _ in range(timeout):
213+
value = cache.get_or_set(self.key, self.value, timeout)
214+
if blocking or value == self.value:
215+
break
216+
time.sleep(1)
217+
return value == self.value
255218

256-
def cleanup(self):
257-
try:
258-
self.scheduler.pod.delete(self.app_id, self.pod_name)
259-
except KubeException:
260-
pass
219+
def release(self):
220+
value = cache.get(self.key, None)
221+
if value == self.value:
222+
return
223+
cache.delete(self.key)
224+
225+
226+
class SyncIterToAsyncIter(object):
227+
228+
def __init__(self, iter_content):
229+
self.iter_content = iter_content
230+
231+
def __aiter__(self):
232+
return self
261233

262-
def request(self, method, server, path="/", **kwargs):
263-
cache.touch(self.cache_key, timeout=settings.DRYCC_FILER_DURATION)
264-
url = f"http://{server["address"]}:{self.bind.split(":")[1]}/{path}"
265-
kwargs["auth"] = HTTPBasicAuth(server["username"], server["password"])
266-
return get_session().request(method, url, **kwargs)
234+
async def __anext__(self):
235+
@sync_to_async
236+
def async_next():
237+
try:
238+
return next(self.iter_content)
239+
except StopIteration:
240+
raise StopAsyncIteration
241+
return await async_next()
267242

268-
def get(self, path, **kwargs):
269-
return self.request("GET", self.server, path, **kwargs)
270243

271-
def post(self, path, **kwargs):
272-
return self.request("POST", self.server, path, **kwargs)
244+
iter_to_aiter = SyncIterToAsyncIter
273245

274246

275247
if __name__ == "__main__":

rootfs/api/views.py

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from rest_framework.permissions import IsAuthenticated, AllowAny
2424
from rest_framework.response import Response
2525
from rest_framework.viewsets import GenericViewSet
26-
from rest_framework.decorators import parser_classes
2726
from rest_framework.parsers import MultiPartParser
2827

2928
from api import monitor, models, permissions, serializers, viewsets, authentication
@@ -33,11 +32,11 @@
3332
from django.views.decorators.cache import never_cache
3433
from django.contrib.auth import REDIRECT_FIELD_NAME
3534
from django.views.decorators.csrf import csrf_exempt
36-
from django.http.response import StreamingHttpResponse
35+
from django.http.response import FileResponse, StreamingHttpResponse
3736
from social_django.utils import psa
3837
from social_django.views import _do_login
3938
from social_core.utils import setting_name
40-
from api import admissions, utils
39+
from api import admissions, utils, filer
4140
from api.backend import OauthCacheManager
4241
from api.apps_extra.social_core.actions import do_auth, do_complete
4342

@@ -827,21 +826,6 @@ def get_object(self):
827826
app__id=self.kwargs['id'],
828827
name=self.kwargs['name'])
829828

830-
@method_decorator(parser_classes([MultiPartParser]))
831-
def client(self, request, **kwargs):
832-
path = self.kwargs.get('path', '')
833-
volume = self.get_object()
834-
client = utils.VolumeClient(volume.app.id, volume, volume.app.scheduler())
835-
if request.method == "GET":
836-
response = client.get(path, stream=True)
837-
return StreamingHttpResponse(
838-
status=response.status_code, streaming_content=response.iter_content(),
839-
content_type=response.headers.get('Content-Type', 'application/octet-stream'))
840-
elif request.method == "POST":
841-
client.post(path, files=request.FILES)
842-
return HttpResponse(status=status.HTTP_204_NO_CONTENT)
843-
return HttpResponse(status=status.HTTP_405_METHOD_NOT_ALLOWED)
844-
845829
def expand(self, request, **kwargs):
846830
volume = self.get_object()
847831
volume.expand(request.data['size'])
@@ -877,6 +861,46 @@ def path(self, request, *args, **kwargs):
877861
return Response(serializer.data)
878862

879863

864+
class AppFilerClientViewSet(BaseDryccViewSet):
865+
"""RESTful views for volumes apps with collaborators."""
866+
model = models.volume.Volume
867+
parser_classes = [MultiPartParser]
868+
869+
def get_client(self):
870+
volume = get_object_or_404(
871+
models.volume.Volume, app__id=self.kwargs['id'], name=self.kwargs['name'])
872+
return filer.FilerClient(volume.app.id, volume, volume.app.scheduler())
873+
874+
def list(self, request, **kwargs):
875+
path = request.query_params.get('path', '')
876+
client = self.get_client()
877+
results = client.get(path, params={"action": "list"}).json()
878+
# fake out pagination for now
879+
pagination = {'results': results, 'count': len(results)}
880+
return Response(data=pagination)
881+
882+
def retrieve(self, request, **kwargs):
883+
path = self.kwargs.get('path', '')
884+
client = self.get_client()
885+
response = client.get(path, stream=True, params={"action": "get"})
886+
return FileResponse(
887+
status=response.status_code,
888+
streaming_content=utils.iter_to_aiter(response.iter_content()),
889+
)
890+
891+
def create(self, request, **kwargs):
892+
path = request.data.get('path', '')
893+
client = self.get_client()
894+
response = client.post(path, files=request.FILES)
895+
return Response(data=response.content, status=response.status_code)
896+
897+
def destroy(self, request, **kwargs):
898+
path = self.kwargs.get('path', '')
899+
client = self.get_client()
900+
response = client.delete(path)
901+
return Response(data=response.content, status=response.status_code)
902+
903+
880904
class AppResourcesViewSet(AppResourceViewSet):
881905
"""RESTful views for resources apps with collaborators."""
882906
model = models.resource.Resource
@@ -1193,8 +1217,6 @@ def status(self, request, **kwargs):
11931217
@method_decorator(vary_on_headers("Authorization"))
11941218
def metric(self, request, **kwargs):
11951219
app_id = self._get_app().id
1196-
streaming_content = monitor.last_metrics(app_id)
11971220
return StreamingHttpResponse(
1198-
content_type='application/json',
1199-
streaming_content=streaming_content
1221+
streaming_content=monitor.last_metrics(app_id)
12001222
)

0 commit comments

Comments
 (0)