Skip to content

Commit 705abde

Browse files
committed
feat(gateway): add listener set support
1 parent ee15539 commit 705abde

6 files changed

Lines changed: 441 additions & 186 deletions

File tree

charts/controller/templates/controller-clusterrole.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ rules:
7979
resources: ["serviceinstances", "servicebindings"]
8080
verbs: ["get", "list", "watch", "create", "delete", "patch", "update"]
8181
- apiGroups: ["gateway.networking.k8s.io"]
82-
resources: ["gateways", "httproutes", "grpcroutes", "tcproutes", "udproutes", "tlsroutes"]
82+
resources: ["gateways", "listenersets", "httproutes", "grpcroutes", "tcproutes", "udproutes", "tlsroutes"]
8383
verbs: ["get", "patch", "list", "create", "update", "delete"]
84+
- apiGroups: ["networking.k8s.io"]
85+
resources: ["networkpolicies"]
86+
verbs: ["get", "list", "create", "patch", "update", "delete"]
8487
- apiGroups: ["batch"]
8588
resources: ["jobs"]
8689
verbs: ["create"]

rootfs/api/models/gateway.py

Lines changed: 152 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import threading
44
from django.db import models
55
from django.conf import settings
6-
from django.db.models import Q
76
from django.http import Http404
87

98
from api.tasks import send_app_log
@@ -54,6 +53,10 @@ def remove(self, port, protocol):
5453
self.ports = ports
5554
return True, ""
5655

56+
@property
57+
def labels(self):
58+
return {"drycc.cc/gateway": self.name}
59+
5760
@property
5861
def addresses(self):
5962
data = self.scheduler.gateways.get(self.app.id, self.name, ignore_exception=True)
@@ -64,37 +67,26 @@ def addresses(self):
6467

6568
@property
6669
def listeners(self):
67-
auto_tls = self.app.tls_set.latest().certs_auto_enabled
6870
listeners = []
69-
domains = list(self.app.domain_set.all())
7071
for item in self.ports:
7172
port, protocol = item["port"], item["protocol"]
72-
if item["protocol"] in HOSTNAME_PROTOCOLS:
73-
for domain in domains:
74-
listener = {
75-
"allowedRoutes": {"namespaces": {"from": "All"}},
76-
"name": self._get_listener_name(port, protocol, domains.index(domain) + 1),
77-
"port": port,
78-
"hostname": domain.domain,
79-
"protocol": protocol,
80-
}
81-
secret_name = f"{self.app.id}-auto-tls" if auto_tls else (
82-
domain.certificate.certname if domain.certificate else None)
83-
if secret_name and protocol in TLS_PROTOCOLS:
84-
listener["tls"] = {
85-
"certificateRefs": [{"kind": "Secret", "name": secret_name}]}
86-
listeners.append(listener)
87-
if protocol not in TLS_PROTOCOLS:
88-
listeners.append({
89-
"allowedRoutes": {"namespaces": {"from": "All"}},
90-
"name": self._get_listener_name(port, protocol, 0),
91-
"port": port,
92-
"protocol": protocol,
93-
})
73+
listener = {
74+
"name": f"{protocol.lower()}-{port}",
75+
"port": port,
76+
"protocol": protocol,
77+
"allowedRoutes": {"namespaces": {"from": "All"}},
78+
}
79+
if protocol == "TLS":
80+
listener["tls"] = {"mode": "Passthrough"}
81+
listeners.append(listener)
9482
return listeners
9583

9684
def refresh_to_k8s(self):
97-
kwargs = {"listeners": self.listeners, "gateway_class": settings.DRYCC_APP_GATEWAY_CLASS}
85+
kwargs = {
86+
"listeners": self.listeners,
87+
"allowed_listeners": {"namespaces": {"from": "Same"}},
88+
"gateway_class": settings.DRYCC_APP_GATEWAY_CLASS
89+
}
9890
if self.app.tls_set.latest().certs_auto_enabled:
9991
kwargs["annotations"] = {"cert-manager.io/issuer": self.app.id}
10092
try:
@@ -104,7 +96,10 @@ def refresh_to_k8s(self):
10496
kwargs["version"] = data["metadata"]["resourceVersion"]
10597
response = self.scheduler.gateways.patch(self.app.id, self.name, **kwargs)
10698
if response.status_code == 409:
107-
raise ServiceUnavailable(f'Kubernetes gateway could not be patched: {response.status_code} {response.reason}, please retry') # noqa
99+
raise ServiceUnavailable(
100+
f'Kubernetes gateway could not be patched: '
101+
f'{response.status_code} {response.reason}'
102+
)
108103
else:
109104
logger.debug("delete k8s resource when listeners are empty")
110105
self.scheduler.gateways.delete(
@@ -118,6 +113,10 @@ def refresh_to_k8s(self):
118113
logger.debug("skip creating k8s resource when listeners are empty")
119114
except KubeException as e:
120115
raise ServiceUnavailable('Kubernetes gateway could not be created') from e
116+
for item in self.ports:
117+
if item["protocol"] in HOSTNAME_PROTOCOLS:
118+
self._refresh_listener_set(item["protocol"], item["port"])
119+
self._cleanup_unused_listener_sets()
121120

122121
def change_default_tls(self):
123122
if self.name != self.app.id:
@@ -135,6 +134,9 @@ def save(self, *args, **kwargs):
135134
self.refresh_to_k8s()
136135

137136
def delete(self, *args, **kwargs):
137+
self.ports = []
138+
self._cleanup_unused_listener_sets()
139+
138140
try:
139141
self.scheduler.gateways.delete(self.app.id, self.name, ignore_exception=False)
140142
except KubeException:
@@ -167,14 +169,97 @@ def _check_port(self, port, protocol):
167169
return False
168170
return True
169171

170-
def _get_listener_name(self, port, protocol, index):
171-
if protocol in ("TCP", "TLS", "HTTP"):
172-
protocol = "TCP"
173-
elif protocol in ("HTTPS", ):
174-
protocol = "MIX"
175-
else:
176-
protocol = "UDP"
177-
return "-".join([protocol, str(port), str(index)]).lower()
172+
def _listener_set_name(self, protocol, port):
173+
return f"{self.name}-{protocol.lower()}-{port}"
174+
175+
def _listener_entry_name(self, domain_str):
176+
return domain_str.replace(".", "-")
177+
178+
def _cleanup_unused_listener_sets(self):
179+
expected = {
180+
self._listener_set_name(item["protocol"], item["port"])
181+
for item in self.ports
182+
if item["protocol"] in HOSTNAME_PROTOCOLS
183+
}
184+
try:
185+
response = self.scheduler.listenersets.get(self.app.id, labels=self.labels)
186+
for listener_set in response.json().get("items", []):
187+
listener_set_name = listener_set["metadata"]["name"]
188+
if listener_set_name not in expected:
189+
self.scheduler.listenersets.delete(self.app.id, listener_set_name)
190+
except KubeException:
191+
self.log('Failed to list ListenerSets for cleanup', level=logging.WARN)
192+
193+
def _refresh_listener_set(self, protocol, port):
194+
listener_set_name = self._listener_set_name(protocol, port)
195+
listeners = self._build_listener_set_listeners(protocol, port)
196+
if not listeners:
197+
self.scheduler.listenersets.delete(self.app.id, listener_set_name)
198+
return
199+
kwargs = {
200+
"listeners": listeners,
201+
"labels": self.labels,
202+
"parent_ref": {
203+
"group": "gateway.networking.k8s.io",
204+
"kind": "Gateway",
205+
"name": self.name,
206+
},
207+
}
208+
209+
auto_tls = self.app.tls_set.latest().certs_auto_enabled
210+
if auto_tls and protocol in TLS_PROTOCOLS:
211+
kwargs["annotations"] = {"cert-manager.io/issuer": self.app.id}
212+
try:
213+
try:
214+
data = self.scheduler.listenersets.get(self.app.id, listener_set_name).json()
215+
kwargs["version"] = data["metadata"]["resourceVersion"]
216+
response = self.scheduler.listenersets.patch(
217+
self.app.id, listener_set_name, **kwargs
218+
)
219+
if response.status_code == 409:
220+
self.log(
221+
f'ListenerSet {listener_set_name} conflict during patch, please retry',
222+
level=logging.WARN,
223+
)
224+
except KubeException:
225+
if "version" in kwargs:
226+
kwargs.pop("version")
227+
self.scheduler.listenersets.create(self.app.id, listener_set_name, **kwargs)
228+
except KubeException as e:
229+
raise ServiceUnavailable(
230+
f'ListenerSet {listener_set_name} could not be created/updated') from e
231+
232+
def _build_listener_set_listeners(self, protocol, port):
233+
auto_tls = self.app.tls_set.latest().certs_auto_enabled
234+
listeners = []
235+
for domain in self.app.domain_set.all():
236+
listener = {
237+
"name": self._listener_entry_name(domain.domain),
238+
"port": port,
239+
"protocol": protocol,
240+
"hostname": domain.domain,
241+
"allowedRoutes": {"namespaces": {"from": "All"}},
242+
}
243+
if protocol in TLS_PROTOCOLS:
244+
secret_name = (
245+
f"{self.app.id}-auto-tls" if auto_tls
246+
else (domain.certificate.certname if domain.certificate else None)
247+
)
248+
if protocol == "TLS":
249+
if secret_name:
250+
listener["tls"] = {
251+
"mode": "Terminate",
252+
"certificateRefs": [{"kind": "Secret", "name": secret_name}],
253+
}
254+
else:
255+
listener["tls"] = {"mode": "Passthrough"}
256+
elif secret_name:
257+
listener["tls"] = {
258+
"mode": "Terminate",
259+
"certificateRefs": [{"kind": "Secret", "name": secret_name}],
260+
}
261+
listeners.append(listener)
262+
return listeners
178263

179264
class Meta:
180265
get_latest_by = 'created'
@@ -218,11 +303,6 @@ def protocols(self):
218303
raise NotImplementedError("this kind is not supported")
219304
return self.PROTOCOLS_CHOICES[self.kind]
220305

221-
@property
222-
def hostnames(self):
223-
return [domain.domain for domain in self.app.domain_set.filter(
224-
ptype__in=[s.ptype for s in self.services])]
225-
226306
@property
227307
def cleaned_rules(self):
228308
services, rules = self.services, []
@@ -238,15 +318,6 @@ def cleaned_rules(self):
238318
rules.append(rule)
239319
return rules
240320

241-
@property
242-
def tls_force_hostnames(self):
243-
tls = self.app.tls_set.latest()
244-
q = Q(ptype__in=[s.ptype for s in self.services])
245-
if not tls.certs_auto_enabled:
246-
q &= Q(certificate__isnull=False)
247-
domains = self.app.domain_set.filter(q)
248-
return [domain.domain for domain in domains]
249-
250321
def check_rules(self, rules):
251322
for rule in rules:
252323
for backend_ref in rule["backendRefs"]:
@@ -280,8 +351,7 @@ def refresh_to_k8s(self):
280351
if self.routable:
281352
parent_refs, http_parent_refs = self._get_all_parent_refs()
282353
tls = self.app.tls_set.latest()
283-
# requestRedirect only when has tls or certs
284-
if tls.https_enforced and self.kind == "HTTPRoute" and self.tls_force_hostnames:
354+
if tls.https_enforced and self.kind == "HTTPRoute" and http_parent_refs:
285355
self._https_enforced_to_k8s(http_parent_refs)
286356
elif self.kind == "HTTPRoute":
287357
parent_refs.extend(http_parent_refs)
@@ -373,7 +443,6 @@ def _check_parent(self, gateway_name, port):
373443
def _refresh_to_k8s(self, rules, parent_refs):
374444
manifest = {
375445
"rules": rules,
376-
"hostnames": self.hostnames,
377446
"parent_refs": parent_refs,
378447
}
379448
try:
@@ -398,7 +467,6 @@ def _https_enforced_to_k8s(self, parent_refs):
398467
}
399468
}]
400469
}],
401-
"hostnames": self.tls_force_hostnames,
402470
"parent_refs": parent_refs,
403471
}
404472
try:
@@ -416,30 +484,46 @@ def _https_enforced_to_k8s(self, parent_refs):
416484
f'Kubernetes {self.kind.lower()} could not be created') from e
417485

418486
def _get_all_parent_refs(self):
419-
gateways = {}
420-
for gateway in self.app.gateway_set.filter(
421-
name__in=[item["name"] for item in self.parent_refs]):
422-
gateways[gateway.name] = gateway
423-
hostnames, parent_refs, http_parent_refs = self.hostnames, [], []
487+
gateways = {
488+
gateway.name: gateway
489+
for gateway in self.app.gateway_set.filter(
490+
name__in=[item["name"] for item in self.parent_refs])
491+
}
492+
domains = list(self.app.domain_set.filter(ptype__in=[s.ptype for s in self.services]))
493+
parent_refs, http_parent_refs = [], []
424494
for item in self.parent_refs:
425-
gateway_name, port = item["name"], item["port"]
495+
gateway_name, gateway_port = item["name"], item["port"]
426496
if gateway_name not in gateways:
427497
continue
428498
gateway = gateways[gateway_name]
429-
for listener in gateway.listeners:
430-
hostname = listener.get('hostname', None)
431-
if listener["port"] == port and listener["protocol"] in self.protocols and (
432-
hostname is None or hostname in hostnames):
433-
parent_ref = {
499+
for port_info in gateway.ports:
500+
port, protocol = port_info["port"], port_info["protocol"]
501+
if port != gateway_port or self.kind.split("Route")[0] not in protocol:
502+
continue
503+
if protocol in HOSTNAME_PROTOCOLS and domains:
504+
listener_set_name = gateway._listener_set_name(protocol, port)
505+
for domain in domains:
506+
ref = {
507+
"group": "gateway.networking.k8s.io",
508+
"kind": "ListenerSet",
509+
"name": listener_set_name,
510+
"sectionName": gateway._listener_entry_name(domain.domain),
511+
}
512+
if protocol == "HTTP" and port == DEFAULT_HTTP_PORT:
513+
http_parent_refs.append(ref)
514+
else:
515+
parent_refs.append(ref)
516+
else:
517+
ref = {
434518
"group": "gateway.networking.k8s.io",
435519
"kind": "Gateway",
436520
"name": gateway_name,
437-
"sectionName": listener["name"],
521+
"sectionName": f"{protocol.lower()}-{port}",
438522
}
439-
if listener["protocol"] == "HTTP" and listener["port"] == DEFAULT_HTTP_PORT:
440-
http_parent_refs.append(parent_ref)
523+
if protocol == "HTTP" and port == DEFAULT_HTTP_PORT:
524+
http_parent_refs.append(ref)
441525
else:
442-
parent_refs.append(parent_ref)
526+
parent_refs.append(ref)
443527
return parent_refs, http_parent_refs
444528

445529
class Meta:

0 commit comments

Comments
 (0)