-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathservice.py
More file actions
158 lines (136 loc) · 5.22 KB
/
service.py
File metadata and controls
158 lines (136 loc) · 5.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import logging
from functools import partial
from django.db import models
from django.contrib.auth import get_user_model
from api.tasks import send_app_log
from api.utils import validate_json
from api.exceptions import ServiceUnavailable
from scheduler import KubeException
from .base import AuditedModel, PTYPE_MAX_LENGTH
User = get_user_model()
logger = logging.getLogger(__name__)
service_ports_schema = {
"$schema": "http://json-schema.org/schema#",
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"port": {"type": "integer"},
"protocol": {"type": "string"},
"targetPort": {"type": "integer"},
},
"required": ["name", "port", "protocol", "targetPort"],
}
}
class Service(AuditedModel):
owner = models.ForeignKey(User, on_delete=models.PROTECT)
app = models.ForeignKey('App', on_delete=models.CASCADE)
ports = models.JSONField(
default=list, validators=[partial(validate_json, schema=service_ports_schema)])
ptype = models.CharField(max_length=PTYPE_MAX_LENGTH)
class Meta:
get_latest_by = 'created'
unique_together = (('app', 'ptype'), )
ordering = ['-created']
def __str__(self):
return self.name
@property
def name(self):
if self.ptype == 'web':
svc_name = self.app.id
else:
svc_name = "{}-{}".format(self.app.id, self.ptype)
return svc_name
@property
def domain(self):
return "{}.{}.svc".format(self.name, self.namespace)
@property
def namespace(self):
return self.app.id
@classmethod
def get(cls, app, name):
for service in cls.objects.filter(app=app):
if service.name == name:
return service
raise cls.DoesNotExist()
def as_dict(self):
return {
"name": self.name,
"domain": self.domain,
"ports": self.ports,
"ptype": self.ptype,
}
def port_name(self, port, protocol):
return "-".join([protocol, str(port)]).lower()
def get_port(self, port, protocol):
for item in self.ports:
if item["port"] == port and item["protocol"] == protocol:
return item
return None
def add_port(self, port, protocol, target_port):
if self.get_port(port, protocol) is None:
self.ports.append({
"name": self.port_name(port, protocol),
"port": port,
"protocol": protocol,
"targetPort": target_port,
})
return True
return False
def update_port(self, port, protocol, target_port):
item = self.get_port(port, protocol)
if not item or item["targetPort"] != target_port:
if item and item["targetPort"] != target_port:
self.remove_port(port, protocol)
self.add_port(port, protocol, target_port)
return True
return False
def remove_port(self, port, protocol):
ports = []
for item in self.ports:
if item["port"] != port or item["protocol"] != protocol:
ports.append(item)
if len(self.ports) > len(ports):
self.ports = ports
return True
return False
def save(self, *args, **kwargs):
service = super(Service, self).save(*args, **kwargs)
self.refresh_k8s_svc()
return service
def delete(self, *args, **kwargs):
self._delete_k8s_svc(self.name)
# Delete from DB
return super(Service, self).delete(*args, **kwargs)
def log(self, message, level=logging.INFO):
"""Logs a message in the context of this service.
This prefixes log messages with an application "tag" that the customized
drycc-logspout will be on the lookout for. When it's seen, the message-- usually
an application event of some sort like releasing or scaling, will be considered
as "belonging" to the application instead of the controller and will be handled
accordingly.
"""
send_app_log.delay(self.app.id, message, level)
logger.log(level, "[{}]: {}".format(self.app.id, message))
def refresh_k8s_svc(self):
self.log('creating service: {}'.format(self.name), level=logging.DEBUG)
try:
try:
data = self.scheduler.svc.get(self.namespace, self.name).json()
self.scheduler.svc.patch(self.namespace, self.name, **{
"ports": self.ports,
"version": data["metadata"]["resourceVersion"],
"ptype": self.ptype,
})
except KubeException:
self.scheduler.svc.create(self.namespace, self.name, **{
"ports": self.ports,
"ptype": self.ptype,
})
except KubeException as e:
raise ServiceUnavailable('Kubernetes service could not be created') from e
def _delete_k8s_svc(self, svc_name):
self.log('deleting Service: {}'.format(svc_name), level=logging.DEBUG)
self.scheduler.svc.delete(self.namespace, svc_name, ignore_exception=True)