-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbroker.py
More file actions
151 lines (137 loc) · 6.15 KB
/
broker.py
File metadata and controls
151 lines (137 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import shutil
from typing import Union, List, Optional
from openbrokerapi.errors import ErrInstanceAlreadyExists, ErrAsyncRequired, \
ErrBindingAlreadyExists, ErrBadRequest, ErrInstanceDoesNotExist
from openbrokerapi.service_broker import ServiceBroker, Service, \
ProvisionDetails, ProvisionedServiceSpec, ProvisionState, GetBindingSpec, \
BindDetails, Binding, BindState, UnbindDetails, UnbindSpec, \
UpdateDetails, UpdateServiceSpec, DeprovisionDetails, \
DeprovisionServiceSpec, LastOperation
from .meta import load_instance_meta, load_binding_meta
from .utils import get_instance_path, get_chart_path, get_plan_path, \
get_addon_path, get_addon_name, get_addon_updateable, get_addon_bindable
from .tasks import provision, bind, deprovision, update
from helmbroker.meta import load_addons_meta
class HelmServiceBroker(ServiceBroker):
def catalog(self) -> Union[Service, List[Service]]:
services = load_addons_meta()
return [Service(
**addons
) for _, addons in services.items()]
def provision(self,
instance_id: str,
details: ProvisionDetails,
async_allowed: bool,
**kwargs) -> ProvisionedServiceSpec:
instance_path = get_instance_path(instance_id)
if os.path.exists(instance_path):
raise ErrInstanceAlreadyExists("Instance %s already exists" % instance_id) # noqa
if not async_allowed:
raise ErrAsyncRequired()
os.makedirs(instance_path, exist_ok=True)
chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) # noqa
addon_chart_path, addon_plan_path = get_addon_path(details.service_id, details.plan_id) # noqa
shutil.copy(addon_chart_path, chart_path)
shutil.copy(addon_plan_path, plan_path)
provision.delay(instance_id, details)
return ProvisionedServiceSpec(state=ProvisionState.IS_ASYNC)
def get_binding(self,
instance_id: str,
binding_id: str,
**kwargs
) -> GetBindingSpec:
data = load_binding_meta(instance_id)
return GetBindingSpec(
data["credentials"],
)
def bind(self,
instance_id: str,
binding_id: str,
details: BindDetails,
async_allowed: bool,
**kwargs
) -> Binding:
is_addon_bindable = get_addon_bindable(instance_id)
if not is_addon_bindable:
raise ErrBadRequest("Instance %s does not bindable" % instance_id)
instance_meta = load_instance_meta(instance_id)
if not (instance_meta and
instance_meta['last_operation']['state'] == 'Ready'):
raise ErrBadRequest(msg="This instance %s is not ready" % instance_id) # noqa
if not async_allowed:
raise ErrAsyncRequired()
instance_path = get_instance_path(instance_id)
if os.path.exists(f'{instance_path}/bind.yaml'):
raise ErrBindingAlreadyExists()
chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) # noqa
addon_name = get_addon_name(details.service_id)
shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/{addon_name}/templates') # noqa
bind.delay(instance_id, binding_id, details, async_allowed, **kwargs)
return Binding(state=BindState.IS_ASYNC)
def unbind(self,
instance_id: str,
binding_id: str,
details: UnbindDetails,
async_allowed: bool,
**kwargs
) -> UnbindSpec:
instance_path = get_instance_path(instance_id)
bind_yaml = f'{instance_path}/bind.yaml'
shutil.rmtree(bind_yaml)
return UnbindSpec(is_async=False)
def update(self,
instance_id: str,
details: UpdateDetails,
async_allowed: bool,
**kwargs
) -> UpdateServiceSpec:
instance_path = get_instance_path(instance_id)
if not os.path.exists(instance_path):
raise ErrBadRequest("Instance %s does not exist" % instance_id)
is_plan_updateable = get_addon_updateable(instance_id)
if not is_plan_updateable:
raise ErrBadRequest("Instance %s does not updateable" % instance_id) # noqa
if not async_allowed:
raise ErrAsyncRequired()
plan_path = get_plan_path(instance_id)
# delete the pre plan
shutil.rmtree(plan_path)
_, addon_plan_path = get_addon_path(details.service_id, details.plan_id) # noqa
# add the new plan
shutil.copy(addon_plan_path, plan_path)
update.delay(instance_id, details)
return UpdateServiceSpec(is_async=True)
def deprovision(self,
instance_id: str,
details: DeprovisionDetails,
async_allowed: bool,
**kwargs) -> DeprovisionServiceSpec:
instance_path = get_instance_path(instance_id)
if not os.path.exists(instance_path):
raise ErrInstanceDoesNotExist("Instance %s not exists" % instance_id) # noqa
if not async_allowed:
raise ErrAsyncRequired()
deprovision.delay(instance_id)
return DeprovisionServiceSpec(is_async=True)
def last_operation(self,
instance_id: str,
operation_data: Optional[str],
**kwargs
) -> LastOperation:
data = load_instance_meta(instance_id)
return LastOperation(
data["last_operation"]["state"],
data["last_operation"]["description"]
)
def last_binding_operation(self,
instance_id: str,
binding_id: str,
operation_data: Optional[str],
**kwargs
) -> LastOperation:
data = load_binding_meta(instance_id)
return LastOperation(
data["last_operation"]["state"],
data["last_operation"]["description"]
)