Skip to content

Commit 1361a2c

Browse files
committed
fix(channels): k8s api blocking
1 parent 8b84add commit 1361a2c

2 files changed

Lines changed: 107 additions & 52 deletions

File tree

rootfs/api/consumers.py

Lines changed: 106 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from django.conf import settings
88
from django.core.cache import cache
99

10-
from asgiref.sync import sync_to_async, async_to_sync
10+
from asgiref.sync import sync_to_async
1111

1212
from kubernetes.client import Configuration, exceptions
1313
from kubernetes.client.api import core_v1_api
@@ -67,58 +67,107 @@ def kubernetes(self):
6767

6868
class AppLogsConsumer(BaseAppConsumer):
6969

70-
async def receive(self, text_data=None, bytes_data=None):
71-
if text_data is not None:
72-
kwargs = json.loads(text_data)
73-
lines = kwargs.get("lines", 100)
74-
follow = kwargs.get("follow", False)
75-
timeout = kwargs.get("timeout", 300)
76-
url = "http://{}:{}/logs/{}?log_lines={}&follow={}&timeout={}".format(
77-
settings.LOGGER_HOST,
78-
settings.LOGGER_PORT,
79-
self.id,
80-
lines,
81-
follow,
82-
timeout,
83-
)
70+
async def connect(self):
71+
await super().connect()
72+
self.session = None
73+
self.running = False
74+
self.conneted = True
75+
76+
async def task(self, **kwargs):
77+
lines = kwargs.get("lines", 100)
78+
follow = kwargs.get("follow", False)
79+
timeout = kwargs.get("timeout", 300)
80+
url = "http://{}:{}/logs/{}?log_lines={}&follow={}&timeout={}".format(
81+
settings.LOGGER_HOST, settings.LOGGER_PORT, self.id, lines, follow, timeout,
82+
)
83+
try:
8484
async with aiohttp.ClientSession() as session:
85+
self.session = session
8586
async with session.get(url) as response:
8687
async for data in response.content.iter_any():
88+
if not self.conneted:
89+
break
8790
await self.send(text_data=data)
91+
except asyncio.TimeoutError:
92+
pass
93+
finally:
8894
await self.close(code=1000)
89-
else:
90-
raise ValueError("text_data cannot be empty!")
95+
96+
async def receive(self, text_data=None, bytes_data=None):
97+
if self.running:
98+
return
99+
self.running = True
100+
kwargs = json.loads(text_data)
101+
asyncio.create_task(self.task(**kwargs))
102+
103+
async def disconnect(self, close_code):
104+
if self.session:
105+
await self.session.close()
106+
self.conneted = False
91107

92108

93109
class AppPodLogsConsumer(BaseK8sConsumer):
94110

95111
async def connect(self):
96112
await super().connect()
113+
self.running = False
114+
self.response = None
115+
self.conneted = True
97116
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
98117

99-
@sync_to_async
100-
def receive(self, text_data=None, bytes_data=None):
101-
kwargs = json.loads(text_data)
118+
def reader(self, sock):
102119
try:
103-
stream = self.kubernetes.read_namespaced_pod_log(self.pod_id, self.id, **{
104-
"tail_lines": kwargs.get("lines", 100),
105-
"follow": kwargs.get("follow", False),
106-
"container": kwargs.get("container", ""),
107-
"_preload_content": False,
108-
}).stream()
109-
for line in stream:
110-
async_to_sync(self.send)(text_data=line)
111-
except exceptions.ApiException as e:
112-
async_to_sync(self.send)(text_data=str(e))
113-
async_to_sync(self.close)(code=1000)
120+
delimiter, buffer = b"\r\n", sock.read()
121+
if delimiter in buffer:
122+
index = buffer.index(delimiter)
123+
length = int(buffer[:index], base=16)
124+
if len(buffer) - (index + len(delimiter)) < length:
125+
buffer += sock.read()
126+
while buffer and self.conneted:
127+
index = buffer.index(delimiter)
128+
length = int(buffer[:index], base=16)
129+
if length == 0:
130+
asyncio.create_task(self.close(code=1000))
131+
break
132+
start_pos = index + len(delimiter)
133+
end_pos = start_pos + length + len(delimiter)
134+
asyncio.create_task(
135+
self.send(bytes_data=buffer[start_pos:end_pos].strip(delimiter)))
136+
buffer = buffer[end_pos:]
137+
except BaseException:
138+
asyncio.create_task(self.close(code=1000))
139+
140+
async def receive(self, text_data=None, bytes_data=None):
141+
if self.running:
142+
return
143+
self.running = True
144+
data = json.loads(text_data)
145+
args = (self.pod_id, self.id)
146+
kwargs = {
147+
"tail_lines": data.get("lines", 100),
148+
"follow": data.get("follow", False),
149+
"container": data.get("container", ""),
150+
"_preload_content": False,
151+
}
152+
loop = asyncio.get_event_loop()
153+
self.response = await sync_to_async(self.kubernetes.read_namespaced_pod_log)(
154+
*args, **kwargs)
155+
loop.add_reader(self.response.connection.sock, self.reader, self.response.connection.sock)
156+
157+
async def disconnect(self, close_code):
158+
if self.response:
159+
loop = asyncio.get_event_loop()
160+
loop.remove_reader(self.response.connection.sock)
161+
await sync_to_async(self.response.close)()
162+
self.conneted = False
114163

115164

116165
class AppPodExecConsumer(BaseK8sConsumer):
117166

118167
async def connect(self):
168+
await super().connect()
119169
self.stream = None
120170
self.conneted = True
121-
await super().connect()
122171
self.pod_id = self.scope["url_route"]["kwargs"]["pod_id"]
123172

124173
async def send(self, data, channel=STDOUT_CHANNEL):
@@ -131,36 +180,42 @@ async def send(self, data, channel=STDOUT_CHANNEL):
131180
elif isinstance(data, str):
132181
await super().send(text_data=channel_prefix+data)
133182

183+
async def wait(self):
184+
future, loop = asyncio.Future(), asyncio.get_event_loop()
185+
loop.add_reader(self.stream.sock, future.set_result, None)
186+
future.add_done_callback(lambda f: loop.remove_reader(self.stream.sock))
187+
await future
188+
134189
async def task(self):
135-
deadline = time.time() + settings.DRYCC_APP_POD_EXEC_TIMEOUT
136-
while self.stream.is_open() and self.conneted and time.time() < deadline:
137-
try:
138-
await sync_to_async(self.stream.update)(0.1)
139-
for channel in (ERROR_CHANNEL, STDOUT_CHANNEL, STDERR_CHANNEL):
140-
if channel in self.stream._channels:
141-
data = self.stream.read_channel(channel)
142-
await self.send(data, channel)
143-
except ssl.SSLEOFError:
144-
break
145-
await self.close(code=1000)
190+
try:
191+
deadline = time.time() + settings.DRYCC_APP_POD_EXEC_TIMEOUT
192+
while self.stream.is_open() and self.conneted and time.time() < deadline:
193+
try:
194+
await self.wait()
195+
self.stream.update()
196+
for channel in (ERROR_CHANNEL, STDOUT_CHANNEL, STDERR_CHANNEL):
197+
if channel in self.stream._channels:
198+
data = self.stream.read_channel(channel)
199+
await self.send(data, channel)
200+
except ssl.SSLEOFError:
201+
break
202+
except exceptions.ApiException as e:
203+
await self.send(str(e), STDERR_CHANNEL)
204+
finally:
205+
await self.close(code=1000)
146206

147207
async def disconnect(self, close_code):
148208
if self.stream:
149-
self.stream.close()
209+
await sync_to_async(self.stream.close)()
150210
self.conneted = False
151211

152212
async def receive(self, text_data=None, bytes_data=None):
153213
if self.stream is None and text_data is not None:
154214
args = (self.kubernetes.connect_get_namespaced_pod_exec, self.pod_id, self.id)
155215
kwargs = json.loads(text_data)
156216
kwargs.update({"stderr": True, "stdout": True, "_preload_content": False})
157-
try:
158-
self.stream = stream(*args, **kwargs)
159-
except exceptions.ApiException as e:
160-
await self.send(str(e), STDERR_CHANNEL)
161-
await self.close(code=1000)
162-
else:
163-
asyncio.create_task(self.task())
217+
self.stream = await sync_to_async(stream)(*args, **kwargs)
218+
asyncio.create_task(self.task())
164219
elif self.stream is not None:
165220
data = text_data if text_data else bytes_data
166221
channel, data = ord(data[0]), data[1:]

rootfs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ psycopg[binary]==3.1.18
2020
pyOpenSSL==24.1.0
2121
ndg-httpsclient==0.5.1
2222
pytz==2024.1
23-
requests==2.31.0
23+
requests==2.32.0
2424
requests-toolbelt==1.0.0
2525
celery==5.3.6
2626
hiredis==2.3.2

0 commit comments

Comments
 (0)