-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathbase.py
More file actions
138 lines (114 loc) · 4.99 KB
/
base.py
File metadata and controls
138 lines (114 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
from scheduler.exceptions import KubeHTTPException
from scheduler.resources import Resource
MEM_REQUEST_BODY_BYTES = int(os.environ.get('MEM_REQUEST_BODY_BYTES', 1024))
MAX_REQUEST_BODY_BYTES = int(os.environ.get('MAX_REQUEST_BODY_BYTES', 1024 * 1024 * 1024))
MEM_RESPONSE_BODY_BYTES = int(os.environ.get('MEM_RESPONSE_BODY_BYTES', 1024 * 1024))
MAX_RESPONSE_BODY_BYTES = int(os.environ.get('MAX_RESPONSE_BODY_BYTES', 1024 * 1024 * 1024))
class BaseIngress(Resource):
abstract = True
api_version = 'networking.k8s.io/v1'
api_prefix = 'apis'
short_name = 'ingress'
ingress_path = "/"
ingress_class = None
def manifest(self, namespace, ingress, **kwargs):
hosts, tls = kwargs.pop("hosts", None), kwargs.pop("tls", None)
version = kwargs.pop("version", None)
data = {
"kind": "Ingress",
"apiVersion": self.api_version,
"metadata": {
"name": ingress,
"namespace": namespace,
"annotations": {
"kubernetes.io/tls-acme": "true"
}
},
"spec": {
"ingressClassName": self.ingress_class
}
}
if hosts:
data["spec"]["rules"] = [{
"host": host,
"http": {
"paths": [
{
"path": self.ingress_path,
"pathType": "Prefix",
"backend": {
"service": {
"name": ingress,
"port": {
"number": 80
}
}
}
}
]
}
} for host in hosts]
if tls:
data["spec"]["tls"] = tls
if version:
data["metadata"]["resourceVersion"] = version
return data
def get(self, namespace, ingress=None, ignore_exception=False, **kwargs):
"""
Fetch a single Ingress or a list of Ingresses
"""
if ingress is not None:
url = self.api("/namespaces/{}/ingresses/{}", namespace, ingress)
message = 'get Ingress ' + ingress
else:
url = self.api("/namespaces/{}/ingresses", namespace)
message = 'get Ingresses'
response = self.http_get(url, params=self.query_params(**kwargs))
if not ignore_exception and self.unhealthy(response.status_code):
raise KubeHTTPException(response, message)
return response
def create(self, namespace, ingress, ignore_exception=False, **kwargs):
url = self.api("/namespaces/{}/ingresses", namespace)
data = self.manifest(namespace, ingress, **kwargs)
response = self.http_post(url, json=data)
if not ignore_exception and self.unhealthy(response.status_code):
raise KubeHTTPException(response, "create Ingress {}".format(namespace))
return response
def put(self, namespace, ingress, version, ignore_exception=False, **kwargs):
url = self.api("/namespaces/{}/ingresses/{}", namespace, ingress)
kwargs["version"] = version
data = self.manifest(namespace, ingress, **kwargs)
response = self.http_put(url, json=data)
if not ignore_exception and self.unhealthy(response.status_code):
raise KubeHTTPException(response, "put Ingress {}".format(namespace))
return response
def delete(self, namespace, ingress, ignore_exception=True):
url = self.api("/namespaces/{}/ingresses/{}", namespace, ingress)
response = self.http_delete(url)
if not ignore_exception and self.unhealthy(response.status_code):
raise KubeHTTPException(response, 'delete Ingress "{}"', namespace)
return response
class IngressClass(Resource):
short_name = 'ingress'
ingress_class_map = {
"default": BaseIngress
}
def get(self, ingress_class, ignore_exception=True):
response = self.http_get(f"/apis/networking.k8s.io/v1/ingressclasses/{ingress_class}")
if not ignore_exception and self.unhealthy(response.status_code):
raise KubeHTTPException(response, 'get IngressClasses "{}"', ingress_class)
return response
def __call__(self, ingress_class):
response = self.get(ingress_class)
controller = "default"
if response.status_code == 200:
controller = response.json()["spec"]["controller"]
ingress_cls = self.ingress_class_map.get(controller, self.ingress_class_map["default"])
ingress_cls.ingress_class = ingress_class
return ingress_cls(self.url, self.k8s_api_verify_tls)
@classmethod
def register(cls, ingress_name, ingress_cls):
cls.ingress_class_map[ingress_name] = ingress_cls
class WildcardPathIngress(BaseIngress):
ingress_path = "/*"