-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathactions.py
More file actions
171 lines (151 loc) · 6.37 KB
/
actions.py
File metadata and controls
171 lines (151 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
from urllib.parse import quote
from social_core.utils import (
partial_pipeline_data,
sanitize_redirect,
setting_url,
user_is_active,
user_is_authenticated,
)
def do_auth(backend, redirect_name="next"):
# Save any defined next value into session
data = backend.strategy.request_data(merge=False)
# Save extra data into session.
for field_name in backend.setting("FIELDS_STORED_IN_SESSION", []):
if field_name in data:
backend.strategy.session_set(field_name, data[field_name])
else:
backend.strategy.session_set(field_name, None)
if redirect_name in data:
# Check and sanitize a user-defined GET/POST next field value
redirect_uri = data[redirect_name]
if backend.setting("SANITIZE_REDIRECTS", True):
allowed_hosts = backend.setting("ALLOWED_REDIRECT_HOSTS", []) + [
backend.strategy.request_host()
]
redirect_uri = sanitize_redirect(allowed_hosts, redirect_uri)
backend.strategy.session_set(
redirect_name, redirect_uri or backend.setting("LOGIN_REDIRECT_URL")
)
response = backend.start()
url = response.url.split("?")[1]
def form2json(form_data):
from urllib.parse import parse_qs, urlparse
query = urlparse("?" + form_data).query
params = parse_qs(query)
return {key: params[key][0] for key in params}
from django.core.cache import cache
cache.set("oidc_key_" + data.get("key", ""), form2json(url).get("state"), 60 * 10)
return response
def do_complete(backend, login, user=None, redirect_name="next", *args, **kwargs):
data = backend.strategy.request_data()
is_authenticated = user_is_authenticated(user)
user = user if is_authenticated else None
partial = partial_pipeline_data(backend, user, *args, **kwargs)
if partial:
user = backend.continue_pipeline(partial)
# clean partial data after usage
backend.strategy.clean_partial_pipeline(partial.token)
else:
user = backend.complete(user=user, *args, **kwargs)
# pop redirect value before the session is trashed on login(), but after
# the pipeline so that the pipeline can change the redirect if needed
redirect_value = backend.strategy.session_get(redirect_name, "") or data.get(
redirect_name, ""
)
# check if the output value is something else than a user and just
# return it to the client
user_model = backend.strategy.storage.user.user_model()
if user and not isinstance(user, user_model):
return user
if is_authenticated:
if not user:
url = setting_url(backend, redirect_value, "LOGIN_REDIRECT_URL")
else:
url = setting_url(
backend,
redirect_value,
"NEW_ASSOCIATION_REDIRECT_URL",
"LOGIN_REDIRECT_URL",
)
elif user:
if user_is_active(user):
# catch is_new/social_user in case login() resets the instance
is_new = getattr(user, "is_new", False)
social_user = user.social_user
login(backend, user, social_user)
# store last login backend name in session
backend.strategy.session_set(
"social_auth_last_login_backend", social_user.provider
)
if is_new:
url = setting_url(
backend,
"NEW_USER_REDIRECT_URL",
redirect_value,
"LOGIN_REDIRECT_URL",
)
else:
url = setting_url(backend, redirect_value, "LOGIN_REDIRECT_URL")
else:
if backend.setting("INACTIVE_USER_LOGIN", False):
social_user = user.social_user
login(backend, user, social_user)
url = setting_url(
backend, "INACTIVE_USER_URL", "LOGIN_ERROR_URL", "LOGIN_URL"
)
else:
url = setting_url(backend, "LOGIN_ERROR_URL", "LOGIN_URL")
if redirect_value and redirect_value != url:
redirect_value = quote(redirect_value)
url += ("&" if "?" in url else "?") + f"{redirect_name}={redirect_value}"
if backend.setting("SANITIZE_REDIRECTS", True):
allowed_hosts = backend.setting("ALLOWED_REDIRECT_HOSTS", []) + [
backend.strategy.request_host()
]
url = sanitize_redirect(allowed_hosts, url) or backend.setting(
"LOGIN_REDIRECT_URL"
)
response = backend.strategy.redirect(url)
social_auth = user.social_auth.filter(provider="drycc").\
order_by("-modified").last()
if social_auth and social_auth.extra_data:
extra_data = json.loads(social_auth.extra_data) if \
isinstance(social_auth.extra_data, str) else social_auth.extra_data
from django.core.cache import cache
cache.set("oidc_state_" + data.get("state"),
{"token": extra_data.get("id_token", "fail"),
"username": user.username},
60 * 10)
return response
def do_disconnect(
backend, user, association_id=None, redirect_name="next", *args, **kwargs
):
partial = partial_pipeline_data(backend, user, *args, **kwargs)
if partial:
if association_id and not partial.kwargs.get("association_id"):
partial.extend_kwargs({"association_id": association_id})
response = backend.disconnect(*partial.args, **partial.kwargs)
# clean partial data after usage
backend.strategy.clean_partial_pipeline(partial.token)
else:
response = backend.disconnect(
user=user, association_id=association_id, *args, **kwargs
)
if isinstance(response, dict):
url = backend.strategy.absolute_uri(
backend.strategy.request_data().get(redirect_name, "")
or backend.setting("DISCONNECT_REDIRECT_URL")
or backend.setting("LOGIN_REDIRECT_URL")
)
if backend.setting("SANITIZE_REDIRECTS", True):
allowed_hosts = backend.setting("ALLOWED_REDIRECT_HOSTS", []) + [
backend.strategy.request_host()
]
url = (
sanitize_redirect(allowed_hosts, url)
or backend.setting("DISCONNECT_REDIRECT_URL")
or backend.setting("LOGIN_REDIRECT_URL")
)
response = backend.strategy.redirect(url)
return response