Skip to content

Commit 02d90f2

Browse files
committed
feat(quickwit): add opensearch compatible API
1 parent 594d0fe commit 02d90f2

1 file changed

Lines changed: 65 additions & 12 deletions

File tree

rootfs/api/views.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,14 +1340,22 @@ class QuickwitProxyView(View):
13401340
authentication.ignore_authentication_failed = True
13411341
index_url_match = re.compile(r"^indexes/?$").match
13421342
search_url_match = re.compile(r"^(?P<index>[a-zA-Z*][\w.*-,]{0,})/search/?$").match
1343+
msearch_url_match = re.compile(r"^_elastic/_msearch/?$").match
1344+
field_caps_url_match = re.compile(r"_elastic/(?P<index>[a-zA-Z*][\w.*-,]{0,})/_field_caps/?$").match
13431345

1344-
async def get(self, request, username, path):
1345-
if match := self.search_url_match(path):
1346-
func, index = self.query, match.group("index")
1347-
elif self.index_url_match(path):
1348-
func, index = self.index, request.GET.get("index_id_patterns", "*")
1346+
async def proxy(self, request, username, path):
1347+
kwargs = {"request": request, "username": username}
1348+
if self.index_url_match(path):
1349+
func, kwargs["index"] = self.index, request.GET.get("index_id_patterns", "*")
1350+
elif match := self.search_url_match(path):
1351+
func, kwargs["index"] = self.query, match.group("index")
1352+
elif self.msearch_url_match(path):
1353+
func = self.msearch
1354+
elif match := self.field_caps_url_match(path):
1355+
func, kwargs["index"] = self.field_caps, match.group("index")
13491356
else:
13501357
raise JsonResponse({'error': 'Not Found'}, status=404)
1358+
13511359
if self.permission.has_permission(request, None):
13521360
if username != "drycc":
13531361
return JsonResponse({'error': 'Access denied'}, status=403)
@@ -1357,12 +1365,11 @@ async def get(self, request, username, path):
13571365
return JsonResponse({'error': 'Unauthorized'}, status=401)
13581366
if auth[0].username != username:
13591367
return JsonResponse({'error': 'Access denied'}, status=403)
1360-
else:
1361-
index = await self.get_app_indexes(username, index)
1362-
return await func(request, index)
1368+
return await func(**kwargs)
13631369

1364-
async def index(self, request, index):
1370+
async def index(self, request, username, index):
13651371
base_url = settings.QUICKWIT_SEARCHER_URL
1372+
index = await self.get_app_indexes(username, index)
13661373
url, params = urljoin(base_url, "/api/v1/indexes"), dict(request.GET)
13671374
params["index_id_patterns"] = index
13681375
try:
@@ -1373,8 +1380,9 @@ async def index(self, request, index):
13731380
data, status = {'error': f'quickwit connection failed: {str(e)}'}, 502
13741381
return JsonResponse(data, status=status, safe=False)
13751382

1376-
async def query(self, request, index):
1383+
async def query(self, request, username, index):
13771384
base_url = settings.QUICKWIT_SEARCHER_URL
1385+
index = await self.get_app_indexes(username, index)
13781386
url, params = urljoin(base_url, f"/api/v1/{index}/search"), dict(request.GET)
13791387
try:
13801388
async with aiohttp.ClientSession() as session:
@@ -1384,18 +1392,63 @@ async def query(self, request, index):
13841392
data, status = {'error': f'quickwit connection failed: {str(e)}'}, 502
13851393
return JsonResponse(data, status=status)
13861394

1395+
async def msearch(self, request, username):
1396+
base_url = settings.QUICKWIT_SEARCHER_URL
1397+
json_lines = request.body.decode('utf-8').strip().split('\n')
1398+
for i, json_line in enumerate(json_lines):
1399+
if i % 2 == 0:
1400+
request_header = json.loads(json_line)
1401+
request_header['index'] = ",".join(
1402+
[await self.get_app_indexes(username, i) for i in request_header['index']]
1403+
).split(",")
1404+
json_lines[i] = json.dumps(request_header)
1405+
url, params = urljoin(
1406+
base_url, f"/api/v1/_elastic/_msearch"), dict(request.GET)
1407+
try:
1408+
async with aiohttp.ClientSession() as session:
1409+
async with session.post(
1410+
url, data="\n".join(json_lines), params=params, timeout=self.timeout
1411+
) as response:
1412+
data, status = await response.json(), response.status
1413+
except aiohttp.ClientError as e:
1414+
data, status = {'error': f'quickwit connection failed: {str(e)}'}, 502
1415+
return JsonResponse(data, status=status)
1416+
1417+
async def field_caps(self, request, username, index):
1418+
base_url = settings.QUICKWIT_SEARCHER_URL
1419+
index = await self.get_app_indexes(username, index)
1420+
url, params = urljoin(
1421+
base_url, f"/api/v1/_elastic/{index}/_field_caps"), dict(request.GET)
1422+
try:
1423+
async with aiohttp.ClientSession() as session:
1424+
async with session.get(url, params=params, timeout=self.timeout) as response:
1425+
data, status = await response.json(), response.status
1426+
except aiohttp.ClientError as e:
1427+
data, status = {'error': f'quickwit connection failed: {str(e)}'}, 502
1428+
return JsonResponse(data, status=status)
1429+
13871430
async def get_app_indexes(self, username, index):
1431+
if username == "drycc":
1432+
return index
13881433
if "," in index:
13891434
match = re.compile("|".join([f"^{i}$" for i in index.split(",")])).match
13901435
else:
13911436
match = re.compile(index).match
13921437
app_indexes, log_index_prefix = [], settings.QUICKWIT_LOG_INDEX_PREFIX
1393-
async for app in models.app.App.objects.filter(owner__username=username):
1394-
app_index = f"{log_index_prefix}{app.id}"
1438+
if hasattr(self, "app_ids"):
1439+
app_ids = self.app_ids
1440+
else:
1441+
app_ids = [
1442+
app.id async for app in models.app.App.objects.filter(owner__username=username)]
1443+
setattr(self, "app_ids", app_ids)
1444+
for app_id in app_ids:
1445+
app_index = f"{log_index_prefix}{app_id}"
13951446
if match(app_index):
13961447
app_indexes.append(app_index)
13971448
return ",".join(app_indexes)
13981449

1450+
get = post = proxy
1451+
13991452

14001453
@method_decorator(csrf_exempt, name='dispatch')
14011454
class PrometheusProxyView(View):

0 commit comments

Comments
 (0)