-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathpermissions.py
More file actions
104 lines (89 loc) · 3.84 KB
/
permissions.py
File metadata and controls
104 lines (89 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import requests
import logging
import urllib.parse
from django.conf import settings
from django.core.cache import cache
from rest_framework import permissions
from api import manager
from api.models import blocklist
from api.models.workspace import Workspace, WorkspaceMember
logger = logging.getLogger(__name__)
def get_app_status(app):
block = blocklist.Blocklist.get_blocklist(app)
if block:
return False, block.remark
if settings.WORKFLOW_MANAGER_URL:
status = manager.WorkspaceAPI().get_status(app.workspace_id)
if not status["is_active"]:
return False, status["message"]
return True, None
class IsOwner(permissions.BasePermission):
"""
Object-level permission to allow only owners of an object to access it.
Assumes the model instance has an `owner` attribute.
"""
def has_object_permission(self, request, view, obj):
if hasattr(obj, 'owner'):
return obj.owner == request.user
else:
return False
class IsAppUser(permissions.BasePermission):
"""
Object-level permission to allow only users who are owners
or collaborators of an app to access it.
"""
def has_object_permission(self, request, view, obj):
if request.user.is_superuser or request.user.is_staff:
return True
elif getattr(obj, "user", None) == request.user:
return True
elif isinstance(obj, Workspace) or hasattr(obj, 'workspace') or hasattr(obj, 'app'):
workspace = obj if isinstance(obj, Workspace) else getattr(
obj, "workspace", None) or getattr(getattr(obj, 'app', None), 'workspace', None)
if request.method in ["GET", "HEAD", "OPTIONS"]:
allowed_roles = ["viewer", "member", "admin"]
elif request.method in ["POST", "PUT", "PATCH"]:
allowed_roles = ["member", "admin"]
else:
allowed_roles = ["admin"]
return WorkspaceMember.objects.filter(
workspace=workspace, user=request.user, role__in=allowed_roles,
).exists()
return False
class HasOAuthScope(permissions.BasePermission):
"""
Object-level permission to allow only requests with specific OAuth scopes.
The required scopes are defined on the view as `required_oauth_scopes = ['scope1', 'scope2']`
"""
def has_permission(self, request, view):
required_oauth_scopes = getattr(view, 'required_oauth_scopes', [])
if not required_oauth_scopes:
return True
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
parts = auth_header.split()
if len(parts) == 2 and parts[0].lower() == 'bearer':
token = parts[1]
else:
return False
scopes = self.get_token_scopes(token)
return set(required_oauth_scopes).issubset(scopes)
def get_token_scopes(self, token):
def _get_scopes():
endpoint = getattr(settings, 'SOCIAL_AUTH_DRYCC_OIDC_ENDPOINT', None)
if not endpoint:
return set()
oauth_introspect_url = urllib.parse.urljoin(endpoint + "/", "introspect/")
key = getattr(settings, 'SOCIAL_AUTH_DRYCC_KEY', '')
secret = getattr(settings, 'SOCIAL_AUTH_DRYCC_SECRET', '')
try:
resp = requests.post(
oauth_introspect_url, auth=(key, secret), data={'token': token}, timeout=5)
if resp.status_code == 200:
data = resp.json()
if data.get("active"):
return set(data.get("scope", "").split())
except Exception as e:
logger.info(f"Error introspecting token: {e}")
return set()
return cache.get_or_set(
f"drycc_oauth_scopes_v2_{token}", _get_scopes, settings.DRYCC_CACHE_USER_TIME)