-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_svcat.py
More file actions
127 lines (111 loc) · 4.8 KB
/
test_svcat.py
File metadata and controls
127 lines (111 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Unit tests for the Drycc scheduler module.
Run the tests with './manage.py test scheduler'
"""
from scheduler import KubeHTTPException
from scheduler.tests import TestCase
from scheduler.utils import generate_random_name
class ServiceCatalogTest(TestCase):
def create_instince(self, namespace=None, name=generate_random_name(),
**kwargs):
"""
Helper function to create and verify a serviceinstances on the namespace
"""
namespace = self.namespace if namespace is None else namespace
# these are all required even if it is kwargs...
kwargs = {
"instance_class": "server",
"instance_plan": "1-1",
"parameters": {
"param-1": "value-1",
"param-2": "value-2"
}
}
instance = self.scheduler.svcat.create_instance(
namespace, name, **kwargs)
self.assertEqual(instance.status_code, 201, instance.json())
return name
def test_create_instince_failure(self):
with self.assertRaises(
KubeHTTPException,
msg='failed to create serviceinstances doesnotexist in Namespace \
{}: 404 Not Found'.format(
self.namespace) # noqa
):
self.create_instince('doesnotexist', 'doesnotexist')
def test_create_instance(self):
self.scheduler.ns.create("test-serviceinstance")
self.create_instince(namespace="test-serviceinstance")
def test_delete_instance_failure(self):
# test failure
with self.assertRaises(
KubeHTTPException,
msg='failed to delete serviceinstance foo in Namespace {}: 404 Not Found'.format(self.namespace) # noqa
):
self.scheduler.svcat.delete_instance(self.namespace, 'foo')
def test_instince_delete(self):
# test success
name = self.create_instince()
response = self.scheduler.svcat.delete_instance(self.namespace, name)
data = response.json()
self.assertEqual(response.status_code, 200, data)
def test_get_serviceinstances(self):
# test success
name = self.create_instince()
response = self.scheduler.svcat.get_instance(self.namespace)
data = response.json()
self.assertEqual(response.status_code, 200, data)
self.assertIn('items', data)
self.assertEqual(1, len(data['items']), data['items'])
# simple verify of data
self.assertEqual(data['items'][0]['metadata']['name'], name, data)
def test_get_serviceinstance(self):
# test success
name = self.create_instince()
response = self.scheduler.svcat.get_instance(self.namespace, name)
data = response.json()
self.assertEqual(response.status_code, 200, data)
# simple verify of data
self.assertEqual(data['metadata']['name'], name, data)
def test_get_serviceinstances_failure(self):
# test failure
with self.assertRaises(
KubeHTTPException,
msg='failed to get Pod doesnotexist in Namespace {}: 404 Not Found'.format(self.namespace) # noqa
):
self.scheduler.svcat.get_instance(self.namespace, 'doesnotexist')
def create_binding(self, namespace=None, name=generate_random_name(),
**kwargs):
name = self.create_instince()
# these are all required even if it is kwargs...
instance = self.scheduler.svcat.create_binding(
self.namespace, name)
self.assertEqual(instance.status_code, 201, instance.json())
return name
def test_create_binding(self):
self.scheduler.ns.create("test-serviceinstance")
self.create_binding(namespace="test-serviceinstance")
def test_binding_delete(self):
# test success
name = self.create_binding(namespace="test-serviceinstance")
response = self.scheduler.svcat.delete_binding(
self.namespace, name)
data = response.json()
self.assertEqual(response.status_code, 200, data)
def test_get_servicebindings(self):
# test success
name = self.create_binding()
response = self.scheduler.svcat.get_binding(self.namespace)
data = response.json()
self.assertEqual(response.status_code, 200, data)
self.assertIn('items', data)
self.assertEqual(1, len(data['items']), data['items'])
# simple verify of data
self.assertEqual(data['items'][0]['metadata']['name'], name, data)
def test_get_servicebindings_failure(self):
# test failure
with self.assertRaises(
KubeHTTPException,
msg='failed to get Pod doesnotexist in Namespace {}: 404 Not Found'.format(self.namespace) # noqa
):
self.scheduler.svcat.get_binding(self.namespace, 'doesnotexist')