Skip to content

Commit af653b4

Browse files
committed
feat(tasks): add distributed async task
1 parent fcf344c commit af653b4

11 files changed

Lines changed: 187 additions & 79 deletions

File tree

charts/controller/templates/controller-deployment.yaml

Lines changed: 77 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{{ $nsqdNodeCount := .Values.nsqd.replicas | int }}
12
apiVersion: apps/v1
23
kind: Deployment
34
metadata:
@@ -53,86 +54,88 @@ spec:
5354
{{- end }}
5455
{{- end }}
5556
env:
56-
- name: REGISTRATION_MODE
57-
value: {{ .Values.registration_mode }}
58-
# NOTE(bacongobbler): use drycc/registry_proxy to work around Docker --insecure-registry requirements
59-
- name: "DRYCC_REGISTRY_PROXY_HOST"
60-
value: "127.0.0.1"
61-
# Environmental variable value for $INGRESS_CLASS
62-
- name: "DRYCC_INGRESS_CLASS"
63-
value: "{{ .Values.global.ingress_class }}"
64-
- name: "DRYCC_PLATFORM_DOMAIN"
65-
value: "{{ .Values.global.platform_domain }}"
66-
- name: "K8S_API_VERIFY_TLS"
67-
value: "{{ .Values.k8s_api_verify_tls }}"
68-
- name: "DRYCC_REGISTRY_PROXY_PORT"
69-
value: "{{ .Values.global.registry_proxy_port }}"
70-
- name: "APP_STORAGE"
71-
value: "{{ .Values.global.storage}}"
72-
- name: "DRYCC_REGISTRY_LOCATION"
73-
value: "{{ .Values.global.registry_location }}"
74-
- name: "DRYCC_REGISTRY_SECRET_PREFIX"
75-
value: "{{ .Values.global.registry_secret_prefix }}"
76-
- name: "IMAGE_PULL_POLICY"
77-
value: "{{ .Values.app_pull_policy }}"
57+
- name: REGISTRATION_MODE
58+
value: {{ .Values.registration_mode }}
59+
# NOTE(bacongobbler): use drycc/registry_proxy to work around Docker --insecure-registry requirements
60+
- name: "DRYCC_REGISTRY_PROXY_HOST"
61+
value: "127.0.0.1"
62+
# Environmental variable value for $INGRESS_CLASS
63+
- name: "DRYCC_INGRESS_CLASS"
64+
value: "{{ .Values.global.ingress_class }}"
65+
- name: "DRYCC_PLATFORM_DOMAIN"
66+
value: "{{ .Values.global.platform_domain }}"
67+
- name: "K8S_API_VERIFY_TLS"
68+
value: "{{ .Values.k8s_api_verify_tls }}"
69+
- name: "DRYCC_REGISTRY_PROXY_PORT"
70+
value: "{{ .Values.global.registry_proxy_port }}"
71+
- name: "APP_STORAGE"
72+
value: "{{ .Values.global.storage}}"
73+
- name: "DRYCC_REGISTRY_LOCATION"
74+
value: "{{ .Values.global.registry_location }}"
75+
- name: "DRYCC_REGISTRY_SECRET_PREFIX"
76+
value: "{{ .Values.global.registry_secret_prefix }}"
77+
- name: "IMAGE_PULL_POLICY"
78+
value: "{{ .Values.app_pull_policy }}"
7879
{{- if (.Values.app_storage_class) }}
79-
- name: "DRYCC_APP_KUBERNETES_STORAGE_CLASS"
80-
value: "{{ .Values.app_storage_class }}"
80+
- name: "DRYCC_APP_KUBERNETES_STORAGE_CLASS"
81+
value: "{{ .Values.app_storage_class }}"
8182
{{- end }}
82-
- name: "TZ"
83-
value: {{ .Values.time_zone | default "UTC" | quote }}
83+
- name: "TZ"
84+
value: {{ .Values.time_zone | default "UTC" | quote }}
8485
{{- if (.Values.deploy_hook_urls) }}
85-
- name: DRYCC_DEPLOY_HOOK_URLS
86-
value: "{{ .Values.deploy_hook_urls }}"
87-
- name: DRYCC_DEPLOY_HOOK_SECRET_KEY
88-
valueFrom:
89-
secretKeyRef:
90-
name: deploy-hook-key
91-
key: secret-key
86+
- name: DRYCC_DEPLOY_HOOK_URLS
87+
value: "{{ .Values.deploy_hook_urls }}"
88+
- name: DRYCC_DEPLOY_HOOK_SECRET_KEY
89+
valueFrom:
90+
secretKeyRef:
91+
name: deploy-hook-key
92+
key: secret-key
9293
{{- end }}
93-
- name: DRYCC_SECRET_KEY
94-
valueFrom:
95-
secretKeyRef:
96-
name: django-secret-key
97-
key: secret-key
98-
- name: DRYCC_BUILDER_KEY
99-
valueFrom:
100-
secretKeyRef:
101-
name: builder-key-auth
102-
key: builder-key
94+
- name: DRYCC_SECRET_KEY
95+
valueFrom:
96+
secretKeyRef:
97+
name: django-secret-key
98+
key: secret-key
99+
- name: DRYCC_BUILDER_KEY
100+
valueFrom:
101+
secretKeyRef:
102+
name: builder-key-auth
103+
key: builder-key
103104
{{- if eq .Values.global.database_location "off-cluster" }}
104-
- name: DRYCC_DATABASE_NAME
105-
valueFrom:
106-
secretKeyRef:
107-
name: database-creds
108-
key: name
109-
- name: DRYCC_DATABASE_SERVICE_HOST
110-
valueFrom:
111-
secretKeyRef:
112-
name: database-creds
113-
key: host
114-
- name: DRYCC_DATABASE_SERVICE_PORT
115-
valueFrom:
116-
secretKeyRef:
117-
name: database-creds
118-
key: port
105+
- name: DRYCC_DATABASE_NAME
106+
valueFrom:
107+
secretKeyRef:
108+
name: database-creds
109+
key: name
110+
- name: DRYCC_DATABASE_SERVICE_HOST
111+
valueFrom:
112+
secretKeyRef:
113+
name: database-creds
114+
key: host
115+
- name: DRYCC_DATABASE_SERVICE_PORT
116+
valueFrom:
117+
secretKeyRef:
118+
name: database-creds
119+
key: port
119120
{{- end }}
120-
- name: DRYCC_DATABASE_USER
121-
valueFrom:
122-
secretKeyRef:
123-
name: database-creds
124-
key: user
125-
- name: DRYCC_DATABASE_PASSWORD
126-
valueFrom:
127-
secretKeyRef:
128-
name: database-creds
129-
key: password
130-
- name: RESERVED_NAMES
131-
value: "drycc, drycc-builder, grafana"
132-
- name: WORKFLOW_NAMESPACE
133-
valueFrom:
134-
fieldRef:
135-
fieldPath: metadata.namespace
121+
- name: DRYCC_DATABASE_USER
122+
valueFrom:
123+
secretKeyRef:
124+
name: database-creds
125+
key: user
126+
- name: DRYCC_DATABASE_PASSWORD
127+
valueFrom:
128+
secretKeyRef:
129+
name: database-creds
130+
key: password
131+
- name: RESERVED_NAMES
132+
value: "drycc, drycc-builder, drycc-monitor-grafana"
133+
- name: WORKFLOW_NAMESPACE
134+
valueFrom:
135+
fieldRef:
136+
fieldPath: metadata.namespace
137+
- name: DRYCC_NSQD_ADDRS
138+
value: "{{range $i := until $nsqdNodeCount}}drycc-nsqd-{{$i}}.drycc-nsqd.{{ $.Release.Namespace }}.svc.cluster.local:{{$.Values.nsqd.tcp_port}}{{if lt (add 1 $i) $nsqdNodeCount}},{{end}}{{end}}"
136139
volumeMounts:
137140
- mountPath: /etc/slugrunner
138141
name: slugrunner-config

charts/controller/values.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ k8s_api_verify_tls: "true"
1919
# Set storageClassName, It is used for application mount.
2020
app_storage_class: ""
2121

22+
nsqd:
23+
replicas: 1
24+
tcp_port: 4150
25+
http_port: 4151
26+
2227
global:
2328
# Admin email, used for each component to send email to administrator
2429
email: "drycc@drycc.cc"

rootfs/Dockerfile.test

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ COPY requirements.txt /app/requirements.txt
44
COPY dev_requirements.txt /app/dev_requirements.txt
55

66
ENV PGDATA /var/lib/postgresql/12
7-
RUN apk add --update --virtual .build-deps \
7+
RUN echo https://dl-cdn.alpinelinux.org/alpine/edge/testing >>/etc/apk/repositories \
8+
&& apk add --update --virtual .build-deps \
89
postgresql-dev \
910
gcc \
1011
libffi-dev \
@@ -26,7 +27,8 @@ RUN apk add --update --virtual .build-deps \
2627
su-exec \
2728
bash \
2829
shadow \
29-
postgresql==12.3-r2 \
30+
nsq \
31+
postgresql \
3032
&& mkdir -p /run/postgresql $PGDATA \
3133
&& chown -R postgres:postgres /run/postgresql $PGDATA \
3234
&& apk del .build-deps \
@@ -37,4 +39,3 @@ COPY . /app
3739
WORKDIR /app
3840
CMD ["/app/bin/boot"]
3941
EXPOSE 8000
40-

rootfs/api/tests/test_build.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def test_build_no_remove_process(self, mock_requests):
349349
self.assertEqual(container['type'], 'worker')
350350
self.assertEqual(container['release'], 'v2')
351351
# pod name is auto generated so use regex
352-
self.assertRegex(container['name'], app_id + '-worker-[0-9]{8,10}-[a-z0-9]{5}')
352+
self.assertRegex(container['name'], app_id + '-worker-[0-9]{7,10}-[a-z0-9]{5}')
353353

354354
# do another deploy for this time forget Procfile
355355
url = "/v2/apps/{app_id}/builds".format(**locals())

rootfs/bin/test-unit

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66
# fail hard and fast even on pipelines
77
set -eou pipefail
88

9+
function start_nsq() {
10+
cd /tmp
11+
nohup nsqd > /var/log/nsqd.log 2>&1 &
12+
cd -
13+
}
14+
915
su-exec postgres pg_ctl -D "$PGDATA" start
16+
start_nsq
1017
python3 manage.py check
11-
coverage run manage.py test --settings=api.settings.testing --noinput api scheduler.tests
18+
coverage run manage.py test --settings=api.settings.testing --noinput tasks.tests api scheduler.tests
1219
coverage report -m

rootfs/drycc/gunicorn/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
bind = '0.0.0.0'
99
workers = int(os.environ.get('GUNICORN_WORKERS', cpu_count() * 4 + 1))
1010

11+
# Use tornado, otherwise NSQ needs to be started separately
12+
worker_class = 'tornado'
13+
1114
pythonpath = dirname(dirname(dirname(realpath(__file__))))
1215
timeout = 1200
1316
pidfile = '/tmp/gunicorn.pid'

rootfs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ morph==0.1.4
1515
ndg-httpsclient==0.5.1
1616
packaging==20.4
1717
pyasn1==0.4.8
18+
pynsq==0.9.0
1819
psycopg2-binary==2.8.5
1920
pyldap==3.0.0.post1
2021
pyOpenSSL==19.1.0

rootfs/tasks/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .task import task, apply_async # noqa

rootfs/tasks/task.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
import json
3+
import threading
4+
import nsq
5+
from functools import wraps
6+
7+
8+
def _message_handler(message):
9+
data = json.loads(message.body)
10+
method = TASKS[data["target_id"]]
11+
threading.Thread(
12+
target=method, args=data["args"], kwargs=data["kwargs"]).start()
13+
return True
14+
15+
16+
TASKS = {}
17+
NSQD_ADDRS = os.environ.get('DRYCC_NSQD_ADDRS', '127.0.0.1:4150').split(",")
18+
NSQ_TOPIC = os.environ.get('DRYCC_NSQ_TASKS_TOPIC', 'tasks:topic')
19+
NSQ_CHANNEL = os.environ.get('DRYCC_NSQ_TASKS_CHANNEL', 'tasks:channel')
20+
NSQD_WRITER = nsq.Writer(NSQD_ADDRS)
21+
NSQD_READER = nsq.Reader(
22+
message_handler=_message_handler,
23+
nsqd_tcp_addresses=NSQD_ADDRS,
24+
topic=NSQ_TOPIC,
25+
channel=NSQ_CHANNEL,
26+
lookupd_poll_interval=15,
27+
)
28+
29+
30+
def task(func):
31+
target_id = "%s.%s" % (func.__module__, func.__name__)
32+
TASKS[target_id] = func
33+
34+
@wraps(func)
35+
def register_task(*args, **kwargs):
36+
return func(*args, **kwargs)
37+
return register_task
38+
39+
40+
def apply_async(target, delay=0, callback=None, *args, **kwargs):
41+
target_id = "%s.%s" % (target.__module__, target.__name__)
42+
if target_id not in TASKS:
43+
raise NotImplemented("This task is not registered.")
44+
message = json.dumps({
45+
"target_id": target_id,
46+
"args": args,
47+
"kwargs": kwargs
48+
}).encode("utf-8")
49+
if delay <= 0:
50+
NSQD_WRITER.pub(NSQ_TOPIC, message, callback=callback)
51+
else:
52+
NSQD_WRITER.dpub(NSQ_TOPIC, delay, message, callback=callback)

rootfs/tasks/tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)