Skip to content

Commit b399cd9

Browse files
committed
feat(helmbroker): add instance lock
1 parent 61bbb20 commit b399cd9

7 files changed

Lines changed: 131 additions & 144 deletions

File tree

charts/helmbroker/templates/helmbroker-clusterrole.yaml

Lines changed: 0 additions & 68 deletions
This file was deleted.

charts/helmbroker/templates/helmbroker-clusterrolebinding.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ metadata:
88
roleRef:
99
apiGroup: rbac.authorization.k8s.io
1010
kind: ClusterRole
11-
name: drycc:drycc-helmbroker
11+
name: cluster-admin
1212
subjects:
1313
- kind: ServiceAccount
1414
name: drycc-helmbroker

rootfs/helmbroker/broker.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import time
23
import shutil
34
from typing import Union, List, Optional
45

@@ -12,9 +13,10 @@
1213
UpdateDetails, UpdateServiceSpec, DeprovisionDetails, \
1314
DeprovisionServiceSpec, LastOperation, OperationState
1415

15-
from .meta import load_instance_meta, load_binding_meta
16+
from .meta import load_instance_meta, load_binding_meta, dump_instance_meta
1617
from .utils import get_instance_path, get_chart_path, get_plan_path, \
17-
get_addon_path, get_addon_updateable, get_addon_bindable
18+
get_addon_path, get_addon_updateable, get_addon_bindable, InstanceLock, \
19+
get_instance_file
1820
from .tasks import provision, bind, deprovision, update
1921
from helmbroker.meta import load_addons_meta
2022

@@ -43,10 +45,29 @@ def provision(self,
4345
if not async_allowed:
4446
raise ErrAsyncRequired()
4547
os.makedirs(instance_path, exist_ok=True)
46-
chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) # noqa
47-
addon_chart_path, addon_plan_path = get_addon_path(details.service_id, details.plan_id) # noqa
48+
chart_path, plan_path = (
49+
get_chart_path(instance_id), get_plan_path(instance_id))
50+
addon_chart_path, addon_plan_path = (
51+
get_addon_path(details.service_id, details.plan_id))
4852
shutil.copytree(addon_chart_path, chart_path)
4953
shutil.copytree(addon_plan_path, plan_path)
54+
data = {
55+
"id": instance_id,
56+
"details": {
57+
"service_id": details.service_id,
58+
"plan_id": details.plan_id,
59+
"context": details.context,
60+
"parameters": details.parameters,
61+
},
62+
"last_operation": {
63+
"state": OperationState.IN_PROGRESS.value,
64+
"description": (
65+
"provision %s in progress at %s" % (
66+
instance_id, time.time()))
67+
}
68+
}
69+
with InstanceLock(instance_id):
70+
dump_instance_meta(instance_id, data)
5071
provision.delay(instance_id, details)
5172
return ProvisionedServiceSpec(state=ProvisionState.IS_ASYNC)
5273

@@ -69,15 +90,18 @@ def bind(self,
6990
) -> Binding:
7091
is_addon_bindable = get_addon_bindable(details.service_id)
7192
if not is_addon_bindable:
72-
raise ErrBadRequest(msg="Instance %s does not bindable" % instance_id) # noqa
93+
raise ErrBadRequest(
94+
msg="Instance %s does not bindable" % instance_id)
7395
instance_meta = load_instance_meta(instance_id)
7496
if not (instance_meta and
7597
instance_meta['last_operation']['state'] == 'succeeded'):
76-
raise ErrBadRequest(msg="This instance %s is not ready" % instance_id) # noqa
98+
raise ErrBadRequest(
99+
msg="This instance %s is not ready" % instance_id)
77100
instance_path = get_instance_path(instance_id)
78101
if os.path.exists(f'{instance_path}/bind.json'):
79102
raise ErrBindingAlreadyExists()
80-
chart_path, plan_path = get_chart_path(instance_id), get_plan_path(instance_id) # noqa
103+
chart_path, plan_path = (
104+
get_chart_path(instance_id), get_plan_path(instance_id))
81105
shutil.copy(f'{plan_path}/bind.yaml', f'{chart_path}/templates')
82106
bind(instance_id, binding_id, details, async_allowed, **kwargs)
83107
data = load_binding_meta(instance_id)
@@ -111,14 +135,16 @@ def update(self,
111135
raise ErrBadRequest(msg="Instance %s does not exist" % instance_id)
112136
is_plan_updateable = get_addon_updateable(details.service_id)
113137
if not is_plan_updateable:
114-
raise ErrBadRequest(msg="Instance %s does not updateable" % instance_id) # noqa
138+
raise ErrBadRequest(
139+
msg="Instance %s does not updateable" % instance_id)
115140
if not async_allowed:
116141
raise ErrAsyncRequired()
117142
if details.plan_id is not None:
118143
plan_path = get_plan_path(instance_id)
119144
# delete the pre plan
120145
shutil.rmtree(plan_path, ignore_errors=True)
121-
_, addon_plan_path = get_addon_path(details.service_id, details.plan_id) # noqa
146+
_, addon_plan_path = get_addon_path(
147+
details.service_id, details.plan_id)
122148
# add the new plan
123149
shutil.copytree(addon_plan_path, plan_path)
124150
update.delay(instance_id, details)
@@ -130,11 +156,14 @@ def deprovision(self,
130156
async_allowed: bool,
131157
**kwargs) -> DeprovisionServiceSpec:
132158
instance_path = get_instance_path(instance_id)
133-
if not os.path.exists(instance_path):
159+
if os.path.exists(instance_path):
160+
if not os.path.exists(get_instance_file(instance_id)):
161+
return DeprovisionServiceSpec(
162+
is_async=False, operation=OperationState.SUCCEEDED)
163+
else:
134164
raise ErrInstanceDoesNotExist()
135165
if not async_allowed:
136166
raise ErrAsyncRequired()
137-
138167
deprovision.delay(instance_id)
139168
return DeprovisionServiceSpec(is_async=True)
140169

rootfs/helmbroker/cleaner.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88
from .config import INSTANCES_PATH
99
from .meta import load_instance_meta
1010
from .tasks import deprovision
11+
from .utils import get_instance_file
1112

1213
logger = logging.getLogger(__name__)
1314

1415

1516
def clean_instance():
1617
for instance_id in os.listdir(INSTANCES_PATH):
17-
if os.path.exists(os.path.join(INSTANCES_PATH, instance_id, "instance.json")): # noqa
18+
if os.path.exists(get_instance_file(instance_id)): # noqa
1819
data = load_instance_meta(instance_id)
1920
interval = time.time() - data["last_modified_time"]
2021
if interval > 3600 * 24 and data["last_operation"]["state"] != OperationState.SUCCEEDED: # noqa

rootfs/helmbroker/meta.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import json
33
import time
44
from jsonschema import validate
5-
from .config import INSTANCES_PATH, ADDONS_PATH
5+
from .utils import get_instance_path, get_instance_file
6+
from .config import ADDONS_PATH
67

78
INSTANCE_META_SCHEMA = {
89
"type": "object",
@@ -35,7 +36,7 @@
3536

3637

3738
def load_instance_meta(instance_id):
38-
file = os.path.join(INSTANCES_PATH, instance_id, "instance.json")
39+
file = get_instance_file(instance_id)
3940
with open(file) as f:
4041
data = json.load(f)
4142
validate(instance=data, schema=INSTANCE_META_SCHEMA)
@@ -44,7 +45,7 @@ def load_instance_meta(instance_id):
4445

4546
def dump_instance_meta(instance_id, data):
4647
data["last_modified_time "] = time.time()
47-
file = os.path.join(INSTANCES_PATH, instance_id, "instance.json")
48+
file = get_instance_file(instance_id)
4849
validate(instance=data, schema=INSTANCE_META_SCHEMA)
4950
with open(file, "w") as f:
5051
f.write(json.dumps(data, sort_keys=True, indent=2))
@@ -70,7 +71,7 @@ def dump_instance_meta(instance_id, data):
7071

7172

7273
def load_binding_meta(instance_id):
73-
file = os.path.join(INSTANCES_PATH, instance_id, "binding.json")
74+
file = os.path.join(get_instance_path(instance_id), "binding.json")
7475
with open(file, 'r') as f:
7576
data = json.loads(f.read())
7677
validate(instance=data, schema=INSTANCE_META_SCHEMA)
@@ -79,7 +80,7 @@ def load_binding_meta(instance_id):
7980

8081
def dump_binding_meta(instance_id, data):
8182
data["last_modified_time "] = time.time()
82-
file = os.path.join(INSTANCES_PATH, instance_id, "binding.json")
83+
file = os.path.join(get_instance_path(instance_id), "binding.json")
8384
validate(instance=data, schema=INSTANCE_META_SCHEMA)
8485
with open(file, "w") as f:
8586
f.write(json.dumps(data, sort_keys=True, indent=2))

rootfs/helmbroker/tasks.py

Lines changed: 59 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,50 @@
11
import os
2+
import shutil
23
import time
34
import yaml
45

56
from openbrokerapi.service_broker import ProvisionDetails, OperationState, \
67
UpdateDetails, BindDetails
78

89
from .celery import app
9-
from .utils import command, get_plan_path, get_chart_path, get_cred_value
10+
from .utils import command, get_plan_path, get_chart_path, get_cred_value, \
11+
InstanceLock, get_instance_file
1012
from .meta import dump_instance_meta, dump_binding_meta, load_instance_meta
1113

1214

1315
@app.task(serializer='pickle')
1416
def provision(instance_id: str, details: ProvisionDetails):
15-
data = {
16-
"id": instance_id,
17-
"details": {
18-
"service_id": details.service_id,
19-
"plan_id": details.plan_id,
20-
"context": details.context,
21-
"parameters": details.parameters,
22-
},
23-
"last_operation": {
24-
"state": OperationState.IN_PROGRESS.value,
25-
"description": "provision %s in progress at %s" % (instance_id, time.time()) # noqa
26-
}
27-
}
28-
dump_instance_meta(instance_id, data)
29-
chart_path = get_chart_path(instance_id)
30-
values_file = os.path.join(get_plan_path(instance_id), "values.yaml")
31-
args = [
32-
"install",
33-
details.context["instance_name"],
34-
chart_path,
35-
"--namespace",
36-
details.context["namespace"],
37-
"--create-namespace",
38-
"--wait",
39-
"--timeout",
40-
"10m0s",
41-
"-f",
42-
values_file,
43-
"--set",
44-
f"fullnameOverride={details.context['instance_name']}"
45-
]
17+
with InstanceLock(instance_id):
18+
chart_path = get_chart_path(instance_id)
19+
values_file = os.path.join(get_plan_path(instance_id), "values.yaml")
20+
args = [
21+
"install",
22+
details.context["instance_name"],
23+
chart_path,
24+
"--namespace",
25+
details.context["namespace"],
26+
"--create-namespace",
27+
"--wait",
28+
"--timeout",
29+
"10m0s",
30+
"-f",
31+
values_file,
32+
"--set",
33+
f"fullnameOverride={details.context['instance_name']}"
34+
]
4635

47-
status, output = command("helm", *args)
48-
if status != 0:
49-
data["last_operation"]["state"] = OperationState.FAILED.value
50-
data["last_operation"]["description"] = "provision error:\n%s" % output
51-
else:
52-
data["last_operation"]["state"] = OperationState.SUCCEEDED.value
53-
data["last_operation"]["description"] = "provision succeeded at %s" % time.time() # noqa
54-
dump_instance_meta(instance_id, data)
36+
status, output = command("helm", *args)
37+
data = load_instance_meta(instance_id)
38+
if status != 0:
39+
data["last_operation"]["state"] = OperationState.FAILED.value
40+
data["last_operation"]["description"] = (
41+
"provision error:\n%s" % output)
42+
else:
43+
data["last_operation"]["state"] = OperationState.SUCCEEDED.value
44+
data["last_operation"]["description"] = (
45+
"provision succeeded at %s" % time.time())
46+
print(data)
47+
dump_instance_meta(instance_id, data)
5548

5649

5750
@app.task(serializer='pickle')
@@ -158,21 +151,29 @@ def bind(instance_id: str,
158151

159152
@app.task()
160153
def deprovision(instance_id: str):
161-
data = load_instance_meta(instance_id)
162-
data["last_operation"]["state"] = OperationState.IN_PROGRESS.value
163-
data["last_operation"]["description"] = "deprovision %s in progress at %s" % (instance_id, time.time()) # noqa
164-
dump_instance_meta(instance_id, data)
165-
status, output = command(
166-
"helm",
167-
"uninstall",
168-
data["details"]["context"]["instance_name"],
169-
"--namespace",
170-
data["details"]["context"]["namespace"],
171-
)
172-
if status != 0:
173-
data["last_operation"]["state"] = OperationState.FAILED.value
174-
data["last_operation"]["description"] = "deprovision error:\n%s" % output # noqa
175-
else:
176-
data["last_operation"]["state"] = OperationState.SUCCEEDED.value
177-
data["last_operation"]["description"] = "deprovision succeeded at %s" % time.time() # noqa
178-
dump_instance_meta(instance_id, data)
154+
with InstanceLock(instance_id):
155+
data = load_instance_meta(instance_id)
156+
data["last_operation"]["state"] = OperationState.IN_PROGRESS.value
157+
data["last_operation"]["description"] = "deprovision %s in progress at %s" % (instance_id, time.time()) # noqa
158+
dump_instance_meta(instance_id, data)
159+
status, output = command(
160+
"helm",
161+
"uninstall",
162+
data["details"]["context"]["instance_name"],
163+
"--namespace",
164+
data["details"]["context"]["namespace"],
165+
)
166+
if status != 0:
167+
data["last_operation"]["state"] = OperationState.FAILED.value
168+
data["last_operation"]["description"] = (
169+
"deprovision error:\n%s" % output)
170+
shutil.copy(get_instance_file(instance_id), "%s.%s" % (
171+
get_instance_file(instance_id),
172+
time.time()
173+
))
174+
else:
175+
data["last_operation"]["state"] = OperationState.SUCCEEDED.value
176+
data["last_operation"]["description"] = (
177+
"deprovision succeeded at %s" % time.time())
178+
os.remove(get_instance_file(instance_id))
179+
dump_instance_meta(instance_id, data)

0 commit comments

Comments
 (0)