Skip to content

Commit 2447453

Browse files
committed
feat(ps): add pod logs support
1 parent bd47d28 commit 2447453

22 files changed

Lines changed: 3135 additions & 195 deletions

charts/controller/templates/controller-clusterrole.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,8 @@ rules:
8282
- apiGroups: ["gateway.networking.k8s.io"]
8383
resources: ["gateways", "httproutes", "grocroutes", "tcproutes", "udproutes", "tlsroutes"]
8484
verbs: ["get", "patch", "list", "create", "update", "delete"]
85+
- apiGroups: ["batch"]
86+
resources: ["jobs"]
87+
verbs: ["create"]
8588
{{- end -}}
8689

charts/controller/templates/controller-webhook-register.yaml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,27 @@
66
apiVersion: admissionregistration.k8s.io/v1
77
kind: MutatingWebhookConfiguration
88
metadata:
9-
name: drycc-controller-webhook
9+
name: {{ $altName2 }}
1010
webhooks:
11-
- name: "{{ $altName2 }}"
11+
- name: {{ $altName2 }}
1212
sideEffects: None
1313
admissionReviewVersions: ["v1"]
1414
clientConfig:
1515
caBundle: {{ b64enc $ca.Cert }}
1616
service:
1717
name: drycc-controller-webhook
1818
namespace: "{{ .Release.Namespace }}"
19-
path: "{{ printf "/v2/webhooks/scale/%s/" $token }}"
19+
path: "{{ printf "/v2/webhooks/%s/" $token }}"
2020
port: 8443
2121
failurePolicy: Fail
22+
objectSelector:
23+
matchLabels:
24+
heritage: drycc
2225
rules:
26+
- operations: ["UPDATE"]
27+
apiGroups: ["batch"]
28+
apiVersions: ["*"]
29+
resources: ["jobs/status"]
2330
- operations: ["UPDATE"]
2431
apiGroups: ["apps"]
2532
apiVersions: ["*"]

rootfs/api/admissions.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from django.db.models import F, Func, Value, JSONField
2+
from rest_framework.request import Request
3+
4+
from api import models
5+
6+
7+
class BaseHandler(object):
8+
9+
def detect(self, request: Request) -> bool:
10+
raise NotImplementedError()
11+
12+
def handle(self, request: Request) -> bool:
13+
raise NotImplementedError()
14+
15+
16+
class JobsStatusHandler(BaseHandler):
17+
18+
def detect(self, request: Request) -> bool:
19+
group = request.get("resource", {}).get("group", None)
20+
resource = "/".join([
21+
request.get("resource", {}).get("resource", None),
22+
request.get("subResource", ""),
23+
])
24+
if (group, resource) == ("batch", "jobs/status"):
25+
return True
26+
return False
27+
28+
def handle(self, request: Request) -> bool:
29+
app_id = request["object"]["metadata"]["namespace"]
30+
app = models.app.App.objects.filter(id=app_id).first()
31+
container_type = request["object"]["metadata"].get("labels", {}).get("type", "")
32+
if app and container_type:
33+
status = request["object"]["status"]
34+
replicas = request["object"]["spec"].get("replicas", 0)
35+
if "active" in status:
36+
replicas += 1
37+
elif "succeeded" in status or "failed" in status:
38+
replicas -= 1
39+
replicas = 0 if replicas < 0 else replicas
40+
if app.structure.get(container_type, 0) != replicas:
41+
models.app.App.objects.filter(id=app.id).update(
42+
structure=Func(
43+
F("structure"),
44+
Value([container_type]),
45+
Value(replicas, JSONField()),
46+
function="jsonb_set",
47+
)
48+
)
49+
return True
50+
51+
52+
class DeploymentsScaleHandler(BaseHandler):
53+
54+
def detect(self, request: Request) -> bool:
55+
group = request.get("resource", {}).get("group", None)
56+
resource = "/".join([
57+
request.get("resource", {}).get("resource", None),
58+
request.get("subResource", ""),
59+
])
60+
if (group, resource) == ("apps", "deployments/scale"):
61+
return True
62+
return False
63+
64+
def handle(self, request: Request) -> bool:
65+
app_id = request["object"]["metadata"]["namespace"]
66+
app = models.app.App.objects.filter(id=app_id).first()
67+
container_type = None
68+
for item in request["object"]["status"]["selector"].split(","):
69+
key, value = item.split("=")
70+
if key == "type":
71+
container_type = value
72+
if app and container_type:
73+
replicas = request["object"]["spec"].get("replicas", 0)
74+
if app.structure.get(container_type, 0) != replicas:
75+
models.app.App.objects.filter(id=app.id).update(
76+
structure=Func(
77+
F("structure"),
78+
Value([container_type]),
79+
Value(replicas, JSONField()),
80+
function="jsonb_set",
81+
)
82+
)
83+
return True

rootfs/api/consumers.py

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import collections
88
from django.conf import settings
99

10-
from asgiref.sync import sync_to_async
10+
from asgiref.sync import sync_to_async, async_to_sync
1111

12-
from kubernetes.client import Configuration
12+
from kubernetes.client import Configuration, exceptions
1313
from kubernetes.client.api import core_v1_api
1414
from kubernetes.stream import stream
1515
from kubernetes.stream.ws_client import STDOUT_CHANNEL, STDERR_CHANNEL, ERROR_CHANNEL
@@ -27,12 +27,16 @@
2727

2828
class BaseAppConsumer(AsyncWebsocketConsumer):
2929

30-
async def has_perm(self):
30+
@database_sync_to_async
31+
def has_perm(self):
3132
if self.scope["user"] is None:
3233
return False, "user not login"
3334
request = Request(self.scope["user"], "POST")
34-
app = await database_sync_to_async(App.objects.get)(id=self.id)
35-
return await database_sync_to_async(has_app_permission)(request, app)
35+
try:
36+
app = App.objects.get(id=self.id)
37+
return has_app_permission(request, app)
38+
except App.DoesNotExist:
39+
return False, "user not exists"
3640

3741
async def connect(self):
3842
self.id = self.scope["url_route"]["kwargs"]["id"]
@@ -43,6 +47,21 @@ async def connect(self):
4347
raise DenyConnection(message)
4448

4549

50+
class BaseK8sConsumer(BaseAppConsumer):
51+
52+
@property
53+
def kubernetes(self):
54+
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as token_file:
55+
token = token_file.read()
56+
config = Configuration(host=settings.SCHEDULER_URL)
57+
config.api_key = {"authorization": "Bearer " + token}
58+
config.verify_ssl = settings.K8S_API_VERIFY_TLS
59+
if config.verify_ssl:
60+
config.ssl_ca_cert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
61+
Configuration.set_default(config)
62+
return core_v1_api.CoreV1Api()
63+
64+
4665
class AppLogsConsumer(BaseAppConsumer):
4766

4867
async def receive(self, text_data=None, bytes_data=None):
@@ -68,19 +87,30 @@ async def receive(self, text_data=None, bytes_data=None):
6887
raise ValueError("text_data cannot be empty!")
6988

7089

71-
class AppPodExecConsumer(BaseAppConsumer):
90+
class AppPodLogsConsumer(BaseK8sConsumer):
7291

73-
@property
74-
def kubernetes(self):
75-
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as token_file:
76-
token = token_file.read()
77-
config = Configuration(host=settings.SCHEDULER_URL)
78-
config.api_key = {"authorization": "Bearer " + token}
79-
config.verify_ssl = settings.K8S_API_VERIFY_TLS
80-
if config.verify_ssl:
81-
config.ssl_ca_cert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
82-
Configuration.set_default(config)
83-
return core_v1_api.CoreV1Api()
92+
async def connect(self):
93+
await super().connect()
94+
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
95+
96+
@sync_to_async
97+
def receive(self, text_data=None, bytes_data=None):
98+
kwargs = json.loads(text_data)
99+
try:
100+
stream = self.kubernetes.read_namespaced_pod_log(self.pod_id, self.id, **{
101+
"tail_lines": kwargs.get("lines", 100),
102+
"follow": kwargs.get("follow", False),
103+
"container": kwargs.get("container", ""),
104+
"_preload_content": False,
105+
}).stream()
106+
for line in stream:
107+
async_to_sync(self.send)(text_data=line)
108+
except exceptions.ApiException as e:
109+
async_to_sync(self.send)(text_data=str(e))
110+
async_to_sync(self.close)(code=1000)
111+
112+
113+
class AppPodExecConsumer(BaseK8sConsumer):
84114

85115
async def connect(self):
86116
self.stream = None
@@ -121,8 +151,13 @@ async def receive(self, text_data=None, bytes_data=None):
121151
args = (self.kubernetes.connect_get_namespaced_pod_exec, self.pod_id, self.id)
122152
kwargs = json.loads(text_data)
123153
kwargs.update({"stderr": True, "stdout": True, "_preload_content": False})
124-
self.stream = stream(*args, **kwargs)
125-
asyncio.create_task(self.task())
154+
try:
155+
self.stream = stream(*args, **kwargs)
156+
except exceptions.ApiException as e:
157+
await self.send(str(e), STDERR_CHANNEL)
158+
await self.close(code=1000)
159+
else:
160+
asyncio.create_task(self.task())
126161
elif self.stream is not None:
127162
data = text_data if text_data else bytes_data
128163
channel, data = ord(data[0]), data[1:]

0 commit comments

Comments
 (0)