-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy path__init__.py
More file actions
146 lines (119 loc) · 4.94 KB
/
__init__.py
File metadata and controls
146 lines (119 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import logging
import random
import requests_mock
import time
import unittest
from os.path import dirname, realpath
from django.test.runner import DiscoverRunner
from django.contrib.auth import get_user_model
from rest_framework.test import APITestCase, APITransactionTestCase
from api.models.base import Token
from api.models.workspace import Workspace, WorkspaceMember
User = get_user_model()
# Mock out router requests and add in some jitter
# Used for application is available in router checks
def fake_responses(request, context):
responses = [
# increasing the chance of 404
{'text': 'Not Found', 'status_code': 404},
{'text': 'Not Found', 'status_code': 404},
{'text': 'Not Found', 'status_code': 404},
{'text': 'Not Found', 'status_code': 404},
{'text': 'OK', 'status_code': 200},
{'text': 'Gateway timeout', 'status_code': 504},
{'text': 'Bad gateway', 'status_code': 502},
]
random.shuffle(responses)
response = responses.pop()
context.status_code = response['status_code']
context.reason = response['text']
# Random float x, 1.0 <= x < 4.0 for some sleep jitter
time.sleep(random.uniform(1, 4))
return response['text']
adapter = requests_mock.Adapter()
adapter.register_uri('GET', '/', text=fake_responses)
adapter.register_uri('GET', '/health', text=fake_responses)
adapter.register_uri('GET', '/healthz', text=fake_responses)
# Root of the test directory (for files and such)
TEST_ROOT = dirname(realpath(__file__))
class SilentDjangoTestSuiteRunner(DiscoverRunner):
"""Prevents api log messages from cluttering the console during tests."""
def run_tests(self, test_labels, **kwargs):
"""Run tests with all but critical log messages disabled."""
# hide any log messages less than critical
logging.disable(logging.ERROR)
return super(SilentDjangoTestSuiteRunner, self).run_tests(
test_labels, **kwargs)
class DryccBaseTestCase(unittest.TestCase):
def _get_authenticated_user(self):
credentials = getattr(getattr(self, 'client', None), '_credentials', {})
auth = credentials.get('HTTP_AUTHORIZATION', '')
if auth.startswith('Token '):
token_key = auth.split(' ', 1)[1]
token = Token.objects.filter(key=token_key).select_related('owner').first()
if token is not None:
return token.owner
return getattr(self, 'user', None)
def _default_workspace_name(self):
user = self._get_authenticated_user()
if user and user.username:
base = ''.join(ch for ch in user.username.lower() if ch.isalnum())
if len(base) >= 5:
return base
return 'autotest'
def _ensure_workspace_admin(self, workspace_name):
user = self._get_authenticated_user()
if user is None:
user = User.objects.filter(username='autotest').first()
if user is None:
raise AssertionError('No test user available for workspace membership')
workspace, _ = Workspace.objects.get_or_create(
name=workspace_name,
defaults={'email': user.email or f'{workspace_name}@example.com'},
)
WorkspaceMember.objects.update_or_create(
workspace=workspace,
user=user,
defaults={'role': 'admin'},
)
return workspace.name
def create_app(self, name=None, workspace=None):
workspace_name = workspace or self._default_workspace_name()
workspace_name = self._ensure_workspace_admin(workspace_name)
body = {'workspace': workspace_name}
if name:
body['id'] = name
response = self.client.post('/v2/apps', body)
self.assertEqual(response.status_code, 201, response.data)
self.assertIn('id', response.data)
return response.data['id']
def get_or_create_token(self, user):
token, _ = Token.objects.get_or_create(
owner=user,
defaults={
"oauth": {
"access_token": "test",
"expires_in": 3600 * 24 * 7,
"token_type": "Bearer",
"scope": "openid",
"refresh_token": "test",
}
}
)
return token.key
def assertPodContains(self, pods, app_id, ptype, version, state="up"):
for pod in pods:
if (pod["type"] == ptype and
pod["release"] == version and pod["state"] == state):
pod_name = app_id + '-%s-[0-9]{1,10}-[a-z0-9]{5}' % ptype
self.assertRegex(pod['name'], pod_name)
return
raise ValueError(
"pod not contains: ptype={}, version={}, state={}".format(
ptype, version, state
)
)
class DryccTransactionTestCase(DryccBaseTestCase, APITransactionTestCase):
pass
class DryccTestCase(DryccBaseTestCase, APITestCase):
pass