Skip to content

Commit 2835981

Browse files
committed
feat(consumers): add filer proxy
1 parent e611c43 commit 2835981

11 files changed

Lines changed: 1194 additions & 159 deletions

File tree

rootfs/api/asgi.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,24 @@
99

1010
import os
1111

12+
from django.urls import re_path
1213
from django.core.asgi import get_asgi_application
1314
from channels.routing import ProtocolTypeRouter, URLRouter
1415
from channels.security.websocket import AllowedHostsOriginValidator
1516

1617

1718
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings.production')
18-
http = get_asgi_application()
19-
20-
from .routing import websocket_urlpatterns # noqa
21-
from .middleware import ChannelOAuthMiddleware # noqa
22-
websocket = AllowedHostsOriginValidator(ChannelOAuthMiddleware(URLRouter(
23-
websocket_urlpatterns
24-
)))
25-
26-
application = ProtocolTypeRouter({"http": http, "websocket": websocket})
19+
django_asgi_app = get_asgi_application()
20+
21+
from .routing import http_urlpatterns, websocket_urlpatterns # noqa
22+
from .middleware import ChannelOAuthMiddleware, ChannelAPIVersionMiddleware # noqa
23+
24+
application = ProtocolTypeRouter({
25+
"http": ChannelAPIVersionMiddleware(URLRouter([
26+
*http_urlpatterns,
27+
re_path(r'', django_asgi_app),
28+
])),
29+
"websocket": AllowedHostsOriginValidator(ChannelOAuthMiddleware(URLRouter(
30+
websocket_urlpatterns
31+
)))
32+
})

rootfs/api/consumers.py

Lines changed: 120 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import time
33
import six
44
import ssl
5+
import aiohttp
56
import asyncio
7+
from urllib.parse import urljoin
68
from django.conf import settings
79
from django.core.cache import cache
810

@@ -14,38 +16,47 @@
1416
from kubernetes.stream import stream
1517
from kubernetes.stream.ws_client import STDOUT_CHANNEL, STDERR_CHANNEL, ERROR_CHANNEL
1618

17-
from channels.db import database_sync_to_async
1819
from channels.exceptions import DenyConnection
20+
from channels.generic.http import AsyncHttpConsumer
1921
from channels.generic.websocket import AsyncWebsocketConsumer
2022

2123
from .models.app import App
24+
from .models.volume import Volume
2225
from .permissions import has_app_permission
2326

2427

25-
class BaseAppConsumer(AsyncWebsocketConsumer):
28+
class AppPermChecker(object):
2629
timeout = 60 * 60
2730

28-
@database_sync_to_async
29-
def has_perm(self):
31+
def __init__(self, scope):
32+
self.scope = scope
33+
34+
async def has_perm(self):
3035
if self.scope["user"] is None:
3136
return False, "user not login"
32-
key = f"permission:user:{self.scope["user"].id}:app:{self.id}"
33-
permission = cache.get(key)
37+
app_id = self.scope["url_route"]["kwargs"]["id"]
38+
key = f"permission:user:{self.scope["user"].id}:app:{app_id}"
39+
permission = await cache.aget(key)
3440
if permission is None:
3541
try:
36-
app = App.objects.get(id=self.id)
37-
permission = has_app_permission(self.scope["user"], app, "GET")
42+
app = await App.objects.aget(id=app_id)
43+
permission = await sync_to_async(has_app_permission)(
44+
self.scope["user"], app, "GET")
3845
if permission[0]:
39-
cache.set(key, permission, timeout=self.timeout)
46+
await cache.aset(key, permission, timeout=self.timeout)
4047
except App.DoesNotExist:
41-
permission = (False, "user not exists")
48+
permission = (False, "app not exists")
4249
return permission
4350

51+
52+
class BaseAppConsumer(AsyncWebsocketConsumer):
53+
4454
async def connect(self):
45-
self.id = self.scope["url_route"]["kwargs"]["id"]
46-
is_ok, message = await self.has_perm()
55+
app_perm_checker = AppPermChecker(self.scope)
56+
is_ok, message = await app_perm_checker.has_perm()
4757
if is_ok:
4858
await self.accept()
59+
self.id = self.scope["url_route"]["kwargs"]["id"]
4960
else:
5061
raise DenyConnection(message)
5162

@@ -74,7 +85,7 @@ async def connect(self):
7485
self.conneted = True
7586
self.buffer = b''
7687
self.delimiter = b"\r\n"
77-
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
88+
self.pod_name = self.scope["url_route"]["kwargs"]["name"]
7889

7990
def reader(self, sock):
8091
self.buffer += sock.read()
@@ -98,7 +109,7 @@ async def receive(self, text_data=None, bytes_data=None):
98109
return
99110
self.running = True
100111
data = json.loads(text_data)
101-
args = (self.pod_id, self.id)
112+
args = (self.pod_name, self.id)
102113
lines = data.get("lines", 300)
103114
follow = data.get("follow", False)
104115
previous = data.get("previous", False)
@@ -134,7 +145,7 @@ async def connect(self):
134145
await super().connect()
135146
self.stream = None
136147
self.conneted = True
137-
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
148+
self.pod_name = self.scope["url_route"]["kwargs"]["name"]
138149

139150
async def send(self, data, channel=STDOUT_CHANNEL):
140151
channel_prefix = chr(channel)
@@ -177,7 +188,7 @@ async def disconnect(self, close_code):
177188

178189
async def receive(self, text_data=None, bytes_data=None):
179190
if self.stream is None and text_data is not None:
180-
args = (self.kubernetes.connect_get_namespaced_pod_exec, self.pod_id, self.id)
191+
args = (self.kubernetes.connect_get_namespaced_pod_exec, self.pod_name, self.id)
181192
kwargs = json.loads(text_data)
182193
kwargs.update({"stderr": True, "stdout": True, "_preload_content": False})
183194
self.stream = await sync_to_async(stream)(*args, **kwargs)
@@ -188,3 +199,96 @@ async def receive(self, text_data=None, bytes_data=None):
188199
await sync_to_async(self.stream.write_channel)(channel, data)
189200
else:
190201
raise ValueError("This operation is not supported!")
202+
203+
204+
class FilerProxyConsumer(AsyncHttpConsumer):
205+
from .middleware import ChannelOAuthMiddleware
206+
chunk_size = 64 * 1024
207+
middleware = ChannelOAuthMiddleware(None)
208+
SKIP_REQUEST_HEADERS = {
209+
'host', 'connection', 'keep-alive', 'proxy-connection', 'te', 'trailers', 'upgrade',
210+
}
211+
SKIP_RESPONSE_HEADERS = {
212+
'connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers',
213+
'transfer-encoding', 'upgrade',
214+
}
215+
216+
async def handle(self, body: bytes):
217+
path = self.scope["url_route"]["kwargs"]["path"]
218+
client = await self._get_client(url_path=f"{self.scope["path"].removesuffix(path)}webdav/")
219+
if not client:
220+
await self.send_response(status=404, body=b'app or volume not found')
221+
return
222+
if path in ['_/ping', '_/bind']: # need authentication
223+
await self._handle_controller_request(client, path)
224+
return
225+
elif not path.startswith('webdav/') or (filer := await client.info()) is None:
226+
await self.send_response(status=428, body=b'filer service unavailable')
227+
return
228+
filer_target_url = "{}?{}".format(
229+
urljoin(filer["endpoint"], self.scope["path"]), self.scope.get("query_string"))
230+
method = self.scope['method'].upper()
231+
headers = {
232+
name_bytes.decode('latin-1').lower(): value_bytes.decode('latin-1')
233+
for name_bytes, value_bytes in self.scope.get('headers', [])
234+
if name_bytes.decode('latin-1').lower() not in self.SKIP_REQUEST_HEADERS
235+
}
236+
await self._handle_proxy_request(filer_target_url, method, headers, body)
237+
238+
async def _get_client(self, url_path):
239+
try:
240+
from .filer import FilerClient
241+
app = await App.objects.aget(id=self.scope["url_route"]["kwargs"]["id"])
242+
volume = await Volume.objects.filter(
243+
app=app, name=self.scope["url_route"]["kwargs"]["name"]).afirst()
244+
if not volume:
245+
return None
246+
return FilerClient(app.id, volume, url_path)
247+
except App.DoesNotExist:
248+
return None
249+
250+
async def _forward_response(self, response: aiohttp.ClientResponse):
251+
response_headers = [
252+
[name.encode('latin-1'), value.encode('latin-1')]
253+
for name, value in response.headers.items()
254+
if name.lower() not in self.SKIP_RESPONSE_HEADERS
255+
]
256+
await self.send_headers(status=response.status, headers=response_headers)
257+
async for chunk in response.content.iter_chunked(self.chunk_size):
258+
if chunk:
259+
await self.send_body(chunk, more_body=True)
260+
# Send final empty chunk to indicate end of body
261+
await self.send_body(b'', more_body=False)
262+
263+
async def _handle_proxy_request(self, url, method: str, headers: dict[str, str], data: bytes):
264+
async with aiohttp.ClientSession(
265+
timeout=aiohttp.ClientTimeout(total=settings.DRYCC_FILER_DURATION)
266+
) as session:
267+
try:
268+
async with session.request(
269+
method=method, url=url, headers=headers, data=data, allow_redirects=False
270+
) as response:
271+
await self._forward_response(response)
272+
except aiohttp.ClientError as e:
273+
await self.send_response(502, f"proxy service unavailable: {e}".encode('utf-8'))
274+
except asyncio.TimeoutError:
275+
await self.send_response(504, b'proxy request to backend filer timeout')
276+
277+
async def _handle_controller_request(self, client, path: str):
278+
await self.middleware.login(self.scope)
279+
app_perm_checker = AppPermChecker(self.scope)
280+
is_ok, message = await app_perm_checker.has_perm()
281+
status, body = 200, b''
282+
if not is_ok:
283+
status, body = 403, message.encode('utf-8')
284+
elif path == '_/ping':
285+
if (filer := await client.info()) is not None:
286+
body = b'pong'
287+
else:
288+
status, body = 503, b'filer service unavailable'
289+
elif path == '_/bind':
290+
filer = await client.bind()
291+
body = json.dumps({
292+
"username": filer["username"], "password": filer["password"],
293+
}).encode('utf-8')
294+
await self.send_response(status, body)

0 commit comments

Comments
 (0)